From 715c356d7f74bda9f7e4bec96472ae34b23040c7 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Thu, 5 Mar 2026 07:34:59 +0000 Subject: [PATCH 1/3] adding discardOutOfOrder field in write request Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 1 + pkg/cortexpb/cortex.pb.go | 241 ++++++++++++++++++------------ pkg/cortexpb/cortex.proto | 2 + pkg/cortexpb/timeseries.go | 1 + pkg/distributor/distributor.go | 12 +- pkg/ingester/ingester.go | 7 + pkg/ingester/ingester_ooo_test.go | 205 +++++++++++++++++++++++++ pkg/ruler/compat.go | 11 +- pkg/ruler/compat_ooo_test.go | 93 ++++++++++++ 9 files changed, 472 insertions(+), 101 deletions(-) create mode 100644 pkg/ingester/ingester_ooo_test.go create mode 100644 pkg/ruler/compat_ooo_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 16b1d37b5ed..d8f6b89cf8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 * [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246 * [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253 +* [ENHANCEMENT] discard ooo samples in some special cases #7227 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 diff --git a/pkg/cortexpb/cortex.pb.go b/pkg/cortexpb/cortex.pb.go index e3bd3e51bf3..cb87faedba3 100644 --- a/pkg/cortexpb/cortex.pb.go +++ b/pkg/cortexpb/cortex.pb.go @@ -192,6 +192,8 @@ type WriteRequest struct { Metadata []*MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty"` SkipLabelNameValidation bool `protobuf:"varint,1000,opt,name=skip_label_name_validation,json=skipLabelNameValidation,proto3" json:"skip_label_name_validation,omitempty"` MessageWithBufRef `protobuf:"bytes,1001,opt,name=Ref,proto3,embedded=Ref,customtype=MessageWithBufRef" json:"Ref"` + // When true, indicates that out-of-order samples should be discarded even if OOO is enabled. + DiscardOutOfOrder bool `protobuf:"varint,1002,opt,name=discard_out_of_order,json=discardOutOfOrder,proto3" json:"discard_out_of_order,omitempty"` } func (m *WriteRequest) Reset() { *m = WriteRequest{} } @@ -247,6 +249,13 @@ func (m *WriteRequest) GetSkipLabelNameValidation() bool { return false } +func (m *WriteRequest) GetDiscardOutOfOrder() bool { + if m != nil { + return m.DiscardOutOfOrder + } + return false +} + // refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto // The histogram and Sample are shared with PRW1. type WriteRequestV2 struct { @@ -1358,100 +1367,102 @@ func init() { func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1477 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0x1b, 0x45, - 0x14, 0xf7, 0xf8, 0x7b, 0x9f, 0x3f, 0xba, 0x99, 0xba, 0xed, 0x36, 0x6d, 0xd7, 0xa9, 0x2b, 0x20, - 0x2a, 0x55, 0x40, 0xa9, 0x28, 0x50, 0x55, 0x48, 0x76, 0xea, 0x34, 0x56, 0x6b, 0x27, 0x1a, 0x3b, - 0xa9, 0xca, 0xc5, 0xda, 0x38, 0xe3, 0x78, 0xd5, 0xfd, 0x30, 0x3b, 0xeb, 0xaa, 0xe1, 0xc4, 0x09, - 0x71, 0xe4, 0xc2, 0x85, 0x1b, 0xe2, 0xc2, 0x95, 0x33, 0xff, 0x40, 0x8f, 0xb9, 0x51, 0x55, 0x22, - 0xa2, 0xee, 0xa5, 0xdc, 0x7a, 0x80, 0x3b, 0x9a, 0xd9, 0x4f, 0xc7, 0xa9, 0x0a, 0xa8, 0x07, 0x6e, - 0x33, 0xbf, 0xf7, 0x66, 0xe6, 0x37, 0x6f, 0x7e, 0xef, 0xb7, 0x36, 0x14, 0x07, 0xb6, 0xe3, 0xd2, - 0xc7, 0x2b, 0x63, 0xc7, 0x76, 0x6d, 0x9c, 0xf7, 0x66, 0xe3, 0xdd, 0xc5, 0xca, 0xbe, 0xbd, 0x6f, - 0x0b, 0xf0, 0x03, 0x3e, 0xf2, 0xe2, 0xb5, 0xf3, 0xb0, 0xd0, 0xa6, 0x8c, 0x69, 0xfb, 0xf4, 0xbe, - 0xee, 0x8e, 0x1a, 0x93, 0x21, 0xa1, 0xc3, 0x9b, 0xe9, 0x57, 0x3f, 0x54, 0x13, 0xb5, 0x5f, 0x93, - 0x50, 0xbc, 0xef, 0xe8, 0x2e, 0x25, 0xf4, 0x8b, 0x09, 0x65, 0x2e, 0xde, 0x02, 0x70, 0x75, 0x93, - 0x32, 0xea, 0xe8, 0x94, 0x29, 0x68, 0x29, 0xb5, 0x5c, 0x58, 0xad, 0xac, 0x04, 0x07, 0xac, 0xf4, - 0x74, 0x93, 0x76, 0x45, 0xac, 0xb1, 0xf8, 0xe4, 0xa8, 0x9a, 0x78, 0x76, 0x54, 0xc5, 0x5b, 0x0e, - 0xd5, 0x0c, 0xc3, 0x1e, 0xf4, 0xc2, 0x75, 0x24, 0xb6, 0x07, 0xbe, 0x06, 0xd9, 0xae, 0x3d, 0x71, - 0x06, 0x54, 0x49, 0x2e, 0xa1, 0xe5, 0x72, 0x7c, 0x37, 0x0f, 0x6f, 0x5a, 0x13, 0x93, 0xf8, 0x39, - 0xf8, 0x26, 0xe4, 0x4d, 0xea, 0x6a, 0x7b, 0x9a, 0xab, 0x29, 0x29, 0x71, 0xba, 0x12, 0xe5, 0xb7, - 0xa9, 0xeb, 0xe8, 0x83, 0xb6, 0x1f, 0x6f, 0xa4, 0x9f, 0x1c, 0x55, 0x11, 0x09, 0xf3, 0xf1, 0x2d, - 0x58, 0x64, 0x0f, 0xf5, 0x71, 0xdf, 0xd0, 0x76, 0xa9, 0xd1, 0xb7, 0x34, 0x93, 0xf6, 0x1f, 0x69, - 0x86, 0xbe, 0xa7, 0xb9, 0xba, 0x6d, 0x29, 0x2f, 0x73, 0x4b, 0x68, 0x39, 0x4f, 0xce, 0xf1, 0x94, - 0x7b, 0x3c, 0xa3, 0xa3, 0x99, 0x74, 0x27, 0x8c, 0xe3, 0x36, 0xa4, 0x08, 0x1d, 0x2a, 0x7f, 0xf0, - 0xb4, 0xc2, 0xea, 0x85, 0xf8, 0xa9, 0xc7, 0x6a, 0xd7, 0xb8, 0xc4, 0xaf, 0x7e, 0x78, 0x54, 0x45, - 0xcf, 0x8e, 0xaa, 0xf3, 0xa5, 0x25, 0x7c, 0x9f, 0xda, 0x2f, 0x49, 0x28, 0xc7, 0x2b, 0xbb, 0xb3, - 0x8a, 0x15, 0xc8, 0xb1, 0x03, 0x73, 0xd7, 0x36, 0x98, 0x92, 0x5e, 0x4a, 0x2d, 0x4b, 0x24, 0x98, - 0xe2, 0xde, 0x4c, 0xd5, 0x33, 0xe2, 0xde, 0x67, 0x4f, 0xaa, 0xfa, 0xce, 0x6a, 0xe3, 0xa2, 0x5f, - 0xf7, 0xca, 0x7c, 0xdd, 0x77, 0x56, 0x5f, 0x53, 0xf9, 0xec, 0x3f, 0xa8, 0xfc, 0xff, 0xad, 0x7a, - 0xc5, 0xf8, 0xad, 0x71, 0x15, 0x0a, 0x82, 0x18, 0xeb, 0x3b, 0x74, 0xe8, 0x09, 0xb3, 0x44, 0xc0, - 0x83, 0x08, 0x1d, 0x32, 0xfc, 0x21, 0xe4, 0x98, 0x66, 0x8e, 0x0d, 0xca, 0x94, 0xa4, 0xa8, 0x9f, - 0x1c, 0xbb, 0xad, 0x08, 0x08, 0xbd, 0x24, 0x48, 0x90, 0x86, 0x3f, 0x05, 0x18, 0xe9, 0xcc, 0xb5, - 0xf7, 0x1d, 0xcd, 0x64, 0xbe, 0xd8, 0x4e, 0x47, 0x8b, 0x36, 0x82, 0x98, 0xbf, 0x2e, 0x96, 0x8c, - 0x3f, 0x01, 0x89, 0x3e, 0xa6, 0xe6, 0xd8, 0xd0, 0x1c, 0xef, 0x2d, 0x67, 0x9a, 0xa4, 0xe9, 0x87, - 0x76, 0x56, 0xfd, 0xa5, 0x51, 0x32, 0xbe, 0x11, 0xd3, 0x77, 0x46, 0xd4, 0xaa, 0x32, 0xa3, 0x6f, - 0x11, 0x09, 0x17, 0x46, 0xda, 0x7e, 0x1f, 0x16, 0x06, 0x0e, 0xd5, 0x5c, 0xba, 0xd7, 0x17, 0x2f, - 0xec, 0x6a, 0xe6, 0x58, 0x3c, 0x6b, 0x8a, 0xc8, 0x7e, 0xa0, 0x17, 0xe0, 0x35, 0x0d, 0x20, 0xe2, - 0xf0, 0xe6, 0xd2, 0x55, 0x20, 0xf3, 0x48, 0x33, 0x26, 0x5e, 0x83, 0x22, 0xe2, 0x4d, 0xf0, 0x45, - 0x90, 0xa2, 0x93, 0x52, 0xe2, 0xa4, 0x08, 0xe0, 0xc6, 0x01, 0x11, 0x5d, 0x7c, 0x1d, 0xd2, 0xee, - 0xc1, 0x98, 0x2a, 0x48, 0x08, 0xad, 0x7a, 0xd2, 0x95, 0xfc, 0xee, 0xed, 0x1d, 0x8c, 0x29, 0x11, - 0xc9, 0xf8, 0x3c, 0xe4, 0x47, 0xd4, 0x18, 0x73, 0x5a, 0xe2, 0x80, 0x12, 0xc9, 0xf1, 0x39, 0xa1, - 0x43, 0x1e, 0x9a, 0x58, 0xba, 0x2b, 0x42, 0x69, 0x2f, 0xc4, 0xe7, 0x5c, 0x1a, 0xbf, 0x21, 0x71, - 0xb2, 0xbf, 0x15, 0xbe, 0x00, 0xe7, 0xda, 0xcd, 0x1e, 0x69, 0xad, 0xf5, 0x7b, 0x0f, 0xb6, 0x9a, - 0xfd, 0xed, 0x4e, 0x77, 0xab, 0xb9, 0xd6, 0x5a, 0x6f, 0x35, 0x6f, 0xcb, 0x09, 0x7c, 0x0e, 0x4e, - 0xc7, 0x83, 0x6b, 0x9b, 0xdb, 0x9d, 0x5e, 0x93, 0xc8, 0x08, 0x9f, 0x81, 0x85, 0x78, 0xe0, 0x4e, - 0x7d, 0xfb, 0x4e, 0x53, 0x4e, 0xe2, 0xf3, 0x70, 0x26, 0x0e, 0x6f, 0xb4, 0xba, 0xbd, 0xcd, 0x3b, - 0xa4, 0xde, 0x96, 0x53, 0x58, 0x85, 0xc5, 0xb9, 0x15, 0x51, 0x3c, 0x7d, 0xfc, 0xa8, 0xee, 0x76, - 0xbb, 0x5d, 0x27, 0x0f, 0xe4, 0x0c, 0xae, 0x80, 0x1c, 0x0f, 0xb4, 0x3a, 0xeb, 0x9b, 0x72, 0x16, - 0x2b, 0x50, 0x99, 0x49, 0xef, 0xd5, 0x7b, 0xcd, 0x6e, 0xb3, 0x27, 0xe7, 0x6a, 0x3f, 0x23, 0xc0, - 0x5d, 0xd7, 0xa1, 0x9a, 0x39, 0x63, 0xcc, 0x8b, 0x90, 0xef, 0x51, 0x4b, 0xb3, 0xdc, 0xd6, 0x6d, - 0x51, 0x65, 0x89, 0x84, 0x73, 0xae, 0x7d, 0x3f, 0x4d, 0x3c, 0xe1, 0x8c, 0x77, 0xc4, 0x37, 0x21, - 0x41, 0x5a, 0xd0, 0xae, 0x2f, 0xdf, 0x52, 0xbb, 0x7e, 0x87, 0xa0, 0xe4, 0x1f, 0xc4, 0xc6, 0xb6, - 0xc5, 0x28, 0xc6, 0x90, 0x1e, 0xd8, 0x7b, 0x9e, 0x20, 0x32, 0x44, 0x8c, 0xb9, 0xff, 0x99, 0xde, - 0x7a, 0x41, 0x53, 0x22, 0xc1, 0x94, 0x47, 0xba, 0x7e, 0xf3, 0x7a, 0x4a, 0x0b, 0xa6, 0x58, 0x05, - 0xd8, 0x88, 0x9a, 0x34, 0x2d, 0x82, 0x31, 0x84, 0xab, 0xb4, 0x19, 0x76, 0x62, 0xc6, 0x53, 0x69, - 0x08, 0xd4, 0xfe, 0x44, 0x00, 0x91, 0x8d, 0xe0, 0x3a, 0x64, 0x3d, 0xd9, 0xfb, 0x1f, 0xb6, 0x58, - 0xb7, 0x0b, 0x4f, 0xdb, 0xd2, 0x74, 0xa7, 0x51, 0xf1, 0xfd, 0xb5, 0x28, 0xa0, 0xfa, 0x9e, 0x36, - 0x76, 0xa9, 0x43, 0xfc, 0x85, 0xff, 0xc1, 0x66, 0x6e, 0xc4, 0xbd, 0xc2, 0x73, 0x19, 0x3c, 0xef, - 0x15, 0xf3, 0x4e, 0x31, 0x6b, 0x4f, 0xe9, 0x7f, 0x61, 0x4f, 0xb5, 0x8f, 0x40, 0x0a, 0xef, 0xc3, - 0x5f, 0x82, 0x9b, 0xb9, 0x78, 0x89, 0x22, 0x11, 0xe3, 0xd9, 0x8e, 0x2f, 0xfa, 0x1d, 0x5f, 0xab, - 0x43, 0xd6, 0xbb, 0x42, 0x14, 0x47, 0x71, 0x47, 0xb8, 0x0c, 0xc5, 0xd0, 0x00, 0xfa, 0x26, 0x13, - 0x8b, 0x53, 0xa4, 0x10, 0x62, 0x6d, 0x56, 0xfb, 0x3e, 0x09, 0xe5, 0xd9, 0xaf, 0x34, 0xfe, 0x78, - 0xc6, 0x1a, 0xae, 0xbc, 0xee, 0x6b, 0x3e, 0x6f, 0x0f, 0xd7, 0x00, 0x9b, 0x02, 0xeb, 0x0f, 0x35, - 0x53, 0x37, 0x0e, 0xc4, 0x37, 0xc9, 0x57, 0x8e, 0xec, 0x45, 0xd6, 0x45, 0x80, 0x7f, 0x8a, 0xf8, - 0x35, 0xb9, 0x79, 0x08, 0x89, 0x48, 0x44, 0x8c, 0x39, 0xc6, 0x5d, 0x43, 0xe8, 0x42, 0x22, 0x62, - 0x5c, 0x3b, 0x98, 0x71, 0x8f, 0x02, 0xe4, 0xb6, 0x3b, 0x77, 0x3b, 0x9b, 0xf7, 0x3b, 0x72, 0x82, - 0x4f, 0x22, 0x87, 0x90, 0x20, 0x13, 0xb8, 0x42, 0x09, 0xa4, 0xb8, 0x13, 0x60, 0x28, 0xcf, 0x75, - 0x7f, 0x01, 0x72, 0x51, 0xc7, 0xe7, 0x21, 0xed, 0x77, 0x79, 0x11, 0xf2, 0xb1, 0xce, 0xbe, 0x0b, - 0x59, 0xef, 0xe8, 0xb7, 0x20, 0xc4, 0xda, 0xd7, 0x08, 0xf2, 0x81, 0x78, 0xde, 0x86, 0xb0, 0x4f, - 0xfe, 0x08, 0x1c, 0x7f, 0xf2, 0xd4, 0xfc, 0x93, 0xff, 0x95, 0x01, 0x29, 0x14, 0x23, 0xbe, 0x04, - 0xd2, 0xc0, 0x9e, 0x58, 0x6e, 0x5f, 0xb7, 0x5c, 0xf1, 0xe4, 0xe9, 0x8d, 0x04, 0xc9, 0x0b, 0xa8, - 0x65, 0xb9, 0xf8, 0x32, 0x14, 0xbc, 0xf0, 0xd0, 0xb0, 0x35, 0xcf, 0xad, 0xd0, 0x46, 0x82, 0x80, - 0x00, 0xd7, 0x39, 0x86, 0x65, 0x48, 0xb1, 0x89, 0x29, 0x4e, 0x42, 0x84, 0x0f, 0xf1, 0x59, 0xc8, - 0xb2, 0xc1, 0x88, 0x9a, 0x9a, 0x78, 0xdc, 0x05, 0xe2, 0xcf, 0xf0, 0x3b, 0x50, 0xfe, 0x92, 0x3a, - 0x76, 0xdf, 0x1d, 0x39, 0x94, 0x8d, 0x6c, 0x63, 0x4f, 0x3c, 0x34, 0x22, 0x25, 0x8e, 0xf6, 0x02, - 0x10, 0xbf, 0xeb, 0xa7, 0x45, 0xbc, 0xb2, 0x82, 0x17, 0x22, 0x45, 0x8e, 0xaf, 0x05, 0xdc, 0xae, - 0x82, 0x1c, 0xcb, 0xf3, 0x08, 0xe6, 0x04, 0x41, 0x44, 0xca, 0x61, 0xa6, 0x47, 0xb2, 0x0e, 0x65, - 0x8b, 0xee, 0x6b, 0xae, 0xfe, 0x88, 0xf6, 0xd9, 0x58, 0xb3, 0x98, 0x92, 0x3f, 0xfe, 0x2b, 0xa0, - 0x31, 0x19, 0x3c, 0xa4, 0x6e, 0x77, 0xac, 0x59, 0x7e, 0x87, 0x96, 0x82, 0x15, 0x1c, 0x63, 0xf8, - 0x3d, 0x38, 0x15, 0x6e, 0xb1, 0x47, 0x0d, 0x57, 0x63, 0x8a, 0xb4, 0x94, 0x5a, 0xc6, 0x24, 0xdc, - 0xf9, 0xb6, 0x40, 0x67, 0x12, 0x05, 0x37, 0xa6, 0xc0, 0x52, 0x6a, 0x19, 0x45, 0x89, 0x82, 0x18, - 0xb7, 0xb7, 0xf2, 0xd8, 0x66, 0x7a, 0x8c, 0x54, 0xe1, 0xcd, 0xa4, 0x82, 0x15, 0x21, 0xa9, 0x70, - 0x0b, 0x9f, 0x54, 0xd1, 0x23, 0x15, 0xc0, 0x11, 0xa9, 0x30, 0xd1, 0x27, 0x55, 0xf2, 0x48, 0x05, - 0xb0, 0x4f, 0xea, 0x16, 0x80, 0x43, 0x19, 0x75, 0xfb, 0x23, 0x5e, 0xf9, 0xb2, 0x30, 0x81, 0x4b, - 0x27, 0xd8, 0xd8, 0x0a, 0xe1, 0x59, 0x1b, 0xba, 0xe5, 0x12, 0xc9, 0x09, 0x86, 0x73, 0xfa, 0x3b, - 0x35, 0xa7, 0x3f, 0x7c, 0x05, 0x4a, 0x83, 0x09, 0x73, 0x6d, 0xb3, 0x2f, 0x24, 0xcb, 0x14, 0x59, - 0xf0, 0x28, 0x7a, 0xe0, 0x8e, 0xc0, 0x6a, 0x37, 0x41, 0x0a, 0xf7, 0x9f, 0x6d, 0xfa, 0x1c, 0xa4, - 0x1e, 0x34, 0xbb, 0x32, 0xc2, 0x59, 0x48, 0x76, 0x36, 0xe5, 0x64, 0xd4, 0xf8, 0xa9, 0xc5, 0xf4, - 0x37, 0x3f, 0xaa, 0xa8, 0x91, 0x83, 0x8c, 0xb8, 0x61, 0xa3, 0x08, 0x10, 0x09, 0xa4, 0x76, 0x0b, - 0x20, 0xaa, 0x26, 0xd7, 0xa8, 0x3d, 0x1c, 0x32, 0xea, 0x89, 0x7e, 0x81, 0xf8, 0x33, 0x8e, 0x1b, - 0xd4, 0xda, 0x77, 0x47, 0x42, 0xeb, 0x25, 0xe2, 0xcf, 0xae, 0x56, 0x01, 0xa2, 0xdf, 0xe0, 0x9c, - 0x44, 0x7d, 0xab, 0x25, 0x27, 0xb8, 0x75, 0x90, 0xed, 0x7b, 0x4d, 0x19, 0x35, 0x3e, 0x3b, 0x7c, - 0xae, 0x26, 0x9e, 0x3e, 0x57, 0x13, 0xaf, 0x9e, 0xab, 0xe8, 0xab, 0xa9, 0x8a, 0x7e, 0x9a, 0xaa, - 0xe8, 0xc9, 0x54, 0x45, 0x87, 0x53, 0x15, 0xfd, 0x3e, 0x55, 0xd1, 0xcb, 0xa9, 0x9a, 0x78, 0x35, - 0x55, 0xd1, 0xb7, 0x2f, 0xd4, 0xc4, 0xe1, 0x0b, 0x35, 0xf1, 0xf4, 0x85, 0x9a, 0xf8, 0x3c, 0xfc, - 0x2b, 0xb8, 0x9b, 0x15, 0xff, 0xfd, 0xae, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x68, 0x4f, 0xef, - 0xc4, 0x2b, 0x0e, 0x00, 0x00, + // 1517 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0x13, 0xc7, + 0x1b, 0xf6, 0xfa, 0xdb, 0xaf, 0x3f, 0xd8, 0x0c, 0x06, 0x36, 0x01, 0xd6, 0xc1, 0xe8, 0xf7, 0x6b, + 0x44, 0x51, 0x8a, 0x82, 0x4a, 0x5b, 0x84, 0x2a, 0xd9, 0xc1, 0x21, 0x16, 0xd8, 0x8e, 0xc6, 0x4e, + 0x10, 0xbd, 0xac, 0x36, 0xf6, 0x38, 0x5e, 0xe1, 0xdd, 0x75, 0x77, 0xc6, 0x88, 0xf4, 0xd4, 0x53, + 0xd5, 0xde, 0x7a, 0xe9, 0xa5, 0xb7, 0xaa, 0x97, 0x5e, 0x7b, 0xee, 0x3f, 0xc0, 0x31, 0xb7, 0x22, + 0xa4, 0x46, 0x25, 0x5c, 0x68, 0x4f, 0x1c, 0xda, 0x7b, 0x35, 0xb3, 0x9f, 0x8e, 0x83, 0x68, 0x2b, + 0x0e, 0xbd, 0xcd, 0x3c, 0xef, 0x3b, 0x33, 0xcf, 0xcc, 0xfb, 0xbc, 0xcf, 0xda, 0x50, 0xe8, 0xdb, + 0x0e, 0x23, 0x8f, 0x57, 0x27, 0x8e, 0xcd, 0x6c, 0x94, 0x75, 0x67, 0x93, 0xdd, 0xa5, 0xf2, 0x9e, + 0xbd, 0x67, 0x0b, 0xf0, 0x3d, 0x3e, 0x72, 0xe3, 0xd5, 0x45, 0x58, 0x68, 0x11, 0x4a, 0xf5, 0x3d, + 0x72, 0xdf, 0x60, 0xa3, 0xfa, 0x74, 0x88, 0xc9, 0xf0, 0x66, 0xf2, 0xd5, 0x77, 0x95, 0x58, 0xf5, + 0xab, 0x04, 0x14, 0xee, 0x3b, 0x06, 0x23, 0x98, 0x7c, 0x3a, 0x25, 0x94, 0xa1, 0x2d, 0x00, 0x66, + 0x98, 0x84, 0x12, 0xc7, 0x20, 0x54, 0x91, 0x96, 0x13, 0x2b, 0xf9, 0xb5, 0xf2, 0xaa, 0x7f, 0xc0, + 0x6a, 0xcf, 0x30, 0x49, 0x57, 0xc4, 0xea, 0x4b, 0x4f, 0x0e, 0x2b, 0xb1, 0x67, 0x87, 0x15, 0xb4, + 0xe5, 0x10, 0x7d, 0x3c, 0xb6, 0xfb, 0xbd, 0x60, 0x1d, 0x8e, 0xec, 0x81, 0xae, 0x42, 0xba, 0x6b, + 0x4f, 0x9d, 0x3e, 0x51, 0xe2, 0xcb, 0xd2, 0x4a, 0x29, 0xba, 0x9b, 0x8b, 0x37, 0xac, 0xa9, 0x89, + 0xbd, 0x1c, 0x74, 0x13, 0xb2, 0x26, 0x61, 0xfa, 0x40, 0x67, 0xba, 0x92, 0x10, 0xa7, 0x2b, 0x61, + 0x7e, 0x8b, 0x30, 0xc7, 0xe8, 0xb7, 0xbc, 0x78, 0x3d, 0xf9, 0xe4, 0xb0, 0x22, 0xe1, 0x20, 0x1f, + 0xdd, 0x82, 0x25, 0xfa, 0xd0, 0x98, 0x68, 0x63, 0x7d, 0x97, 0x8c, 0x35, 0x4b, 0x37, 0x89, 0xf6, + 0x48, 0x1f, 0x1b, 0x03, 0x9d, 0x19, 0xb6, 0xa5, 0xbc, 0xcc, 0x2c, 0x4b, 0x2b, 0x59, 0x7c, 0x8e, + 0xa7, 0xdc, 0xe3, 0x19, 0x6d, 0xdd, 0x24, 0x3b, 0x41, 0x1c, 0xb5, 0x20, 0x81, 0xc9, 0x50, 0xf9, + 0x8d, 0xa7, 0xe5, 0xd7, 0xce, 0x47, 0x4f, 0x3d, 0xf6, 0x76, 0xf5, 0x8b, 0xfc, 0xea, 0x07, 0x87, + 0x15, 0xe9, 0xd9, 0x61, 0x65, 0xfe, 0x69, 0x31, 0xdf, 0x07, 0x5d, 0x83, 0xf2, 0xc0, 0xa0, 0x7d, + 0xdd, 0x19, 0x68, 0xf6, 0x94, 0x69, 0xf6, 0x50, 0xb3, 0x9d, 0x01, 0x71, 0x94, 0xdf, 0x5d, 0x1a, + 0x0b, 0x5e, 0xb0, 0x33, 0x65, 0x9d, 0x61, 0x87, 0x47, 0xaa, 0x3f, 0xc5, 0xa1, 0x14, 0xad, 0xc5, + 0xce, 0x1a, 0x52, 0x20, 0x43, 0xf7, 0xcd, 0x5d, 0x7b, 0x4c, 0x95, 0xe4, 0x72, 0x62, 0x25, 0x87, + 0xfd, 0x29, 0xea, 0xcd, 0xd4, 0x29, 0x25, 0x5e, 0xea, 0xec, 0x49, 0x75, 0xda, 0x59, 0xab, 0x5f, + 0xf0, 0x2a, 0x55, 0x9e, 0xaf, 0xd4, 0xce, 0xda, 0x6b, 0x6a, 0x95, 0xfe, 0x1b, 0xb5, 0xfa, 0x2f, + 0xbd, 0x37, 0x7f, 0xbd, 0x42, 0xf4, 0xd6, 0xa8, 0x02, 0x79, 0x41, 0x8c, 0x6a, 0x0e, 0x19, 0xba, + 0x52, 0x2e, 0x62, 0x70, 0x21, 0x4c, 0x86, 0x14, 0x5d, 0x83, 0x0c, 0xd5, 0xcd, 0xc9, 0x98, 0x50, + 0x25, 0x2e, 0xde, 0x4f, 0x8e, 0xdc, 0x56, 0x04, 0x84, 0xc2, 0x62, 0xd8, 0x4f, 0x43, 0x1f, 0x01, + 0x8c, 0x0c, 0xca, 0xec, 0x3d, 0x47, 0x37, 0xa9, 0x27, 0xcf, 0xd3, 0xe1, 0xa2, 0x4d, 0x3f, 0xe6, + 0xad, 0x8b, 0x24, 0xa3, 0x0f, 0x21, 0x47, 0x1e, 0x13, 0x73, 0x32, 0xd6, 0x1d, 0xb7, 0x96, 0x33, + 0x6d, 0xd5, 0xf0, 0x42, 0x3b, 0x6b, 0xde, 0xd2, 0x30, 0x19, 0xdd, 0x88, 0x74, 0x44, 0x4a, 0xbc, + 0x55, 0x79, 0xa6, 0x23, 0x44, 0x24, 0x58, 0x18, 0x76, 0xc3, 0xbb, 0xb0, 0xd0, 0x77, 0x88, 0xce, + 0xc8, 0x40, 0x13, 0x15, 0x66, 0xba, 0x39, 0x11, 0x65, 0x4d, 0x60, 0xd9, 0x0b, 0xf4, 0x7c, 0xbc, + 0xaa, 0x03, 0x84, 0x1c, 0xde, 0xfc, 0x74, 0x65, 0x48, 0x3d, 0xd2, 0xc7, 0x53, 0xb7, 0xa5, 0x25, + 0xec, 0x4e, 0xd0, 0x05, 0xc8, 0x85, 0x27, 0x25, 0xc4, 0x49, 0x21, 0x50, 0xfd, 0x39, 0x0e, 0x10, + 0xd2, 0x45, 0xd7, 0x21, 0xc9, 0xf6, 0x27, 0x44, 0x91, 0x84, 0xd0, 0x2a, 0x27, 0x5d, 0xc9, 0xeb, + 0xf7, 0xde, 0xfe, 0x84, 0x60, 0x91, 0x8c, 0x16, 0x21, 0x3b, 0x22, 0xe3, 0x09, 0xa7, 0x25, 0x0e, + 0x28, 0xe2, 0x0c, 0x9f, 0xf3, 0x7e, 0x5b, 0x84, 0xec, 0xd4, 0x32, 0x98, 0x08, 0x25, 0xdd, 0x10, + 0x9f, 0x73, 0x69, 0xfc, 0x22, 0x89, 0x93, 0xbd, 0xad, 0xd0, 0x79, 0x38, 0xd7, 0x6a, 0xf4, 0x70, + 0x73, 0x5d, 0xeb, 0x3d, 0xd8, 0x6a, 0x68, 0xdb, 0xed, 0xee, 0x56, 0x63, 0xbd, 0xb9, 0xd1, 0x6c, + 0xdc, 0x96, 0x63, 0xe8, 0x1c, 0x9c, 0x8e, 0x06, 0xd7, 0x3b, 0xdb, 0xed, 0x5e, 0x03, 0xcb, 0x12, + 0x3a, 0x03, 0x0b, 0xd1, 0xc0, 0x9d, 0xda, 0xf6, 0x9d, 0x86, 0x1c, 0x47, 0x8b, 0x70, 0x26, 0x0a, + 0x6f, 0x36, 0xbb, 0xbd, 0xce, 0x1d, 0x5c, 0x6b, 0xc9, 0x09, 0xa4, 0xc2, 0xd2, 0xdc, 0x8a, 0x30, + 0x9e, 0x3c, 0x7e, 0x54, 0x77, 0xbb, 0xd5, 0xaa, 0xe1, 0x07, 0x72, 0x0a, 0x95, 0x41, 0x8e, 0x06, + 0x9a, 0xed, 0x8d, 0x8e, 0x9c, 0x46, 0x0a, 0x94, 0x67, 0xd2, 0x7b, 0xb5, 0x5e, 0xa3, 0xdb, 0xe8, + 0xc9, 0x99, 0xea, 0x8f, 0x12, 0xa0, 0x2e, 0x73, 0x88, 0x6e, 0xce, 0x58, 0xf9, 0x12, 0x64, 0x7b, + 0xc4, 0xd2, 0x2d, 0xd6, 0xbc, 0x2d, 0x5e, 0x39, 0x87, 0x83, 0x39, 0xd7, 0xbe, 0x97, 0x26, 0x4a, + 0x38, 0xe3, 0x1d, 0xd1, 0x4d, 0xb0, 0x9f, 0xe6, 0xb7, 0xeb, 0xcb, 0xb7, 0xd4, 0xae, 0xdf, 0x48, + 0x50, 0xf4, 0x0e, 0xa2, 0x13, 0xdb, 0xa2, 0x04, 0x21, 0x48, 0xf6, 0xed, 0x81, 0x2b, 0x88, 0x14, + 0x16, 0x63, 0xee, 0x7f, 0xa6, 0xbb, 0x5e, 0xd0, 0xcc, 0x61, 0x7f, 0xca, 0x23, 0x5d, 0xaf, 0x79, + 0x5d, 0xa5, 0xf9, 0x53, 0xa4, 0x02, 0x6c, 0x86, 0x4d, 0x9a, 0x14, 0xc1, 0x08, 0xc2, 0x55, 0xda, + 0x08, 0x3a, 0x31, 0xe5, 0xaa, 0x34, 0x00, 0xaa, 0x7f, 0x48, 0x00, 0xa1, 0x8d, 0xa0, 0x1a, 0xa4, + 0x5d, 0xd9, 0x7b, 0x9f, 0xc2, 0x48, 0xb7, 0x0b, 0x4f, 0xdb, 0xd2, 0x0d, 0xa7, 0x5e, 0xf6, 0xfc, + 0xb5, 0x20, 0xa0, 0xda, 0x40, 0x9f, 0x30, 0xe2, 0x60, 0x6f, 0xe1, 0xbf, 0xb0, 0x99, 0x1b, 0x51, + 0xaf, 0x70, 0x5d, 0x06, 0xcd, 0x7b, 0xc5, 0xbc, 0x53, 0xcc, 0xda, 0x53, 0xf2, 0x1f, 0xd8, 0x53, + 0xf5, 0x7d, 0xc8, 0x05, 0xf7, 0xe1, 0x95, 0xe0, 0x66, 0x2e, 0x2a, 0x51, 0xc0, 0x62, 0x3c, 0xdb, + 0xf1, 0x05, 0xaf, 0xe3, 0xab, 0x35, 0x48, 0xbb, 0x57, 0x08, 0xe3, 0x52, 0xd4, 0x11, 0x2e, 0x41, + 0x21, 0x30, 0x00, 0xcd, 0xa4, 0x62, 0x71, 0x02, 0xe7, 0x03, 0xac, 0x45, 0xab, 0xdf, 0xc6, 0xa1, + 0x34, 0xfb, 0x5d, 0x47, 0x1f, 0xcc, 0x58, 0xc3, 0xe5, 0xd7, 0x7d, 0xff, 0xe7, 0xed, 0xe1, 0x2a, + 0x20, 0x53, 0x60, 0xda, 0x50, 0x37, 0x8d, 0xf1, 0xbe, 0xf8, 0x26, 0x79, 0xca, 0x91, 0xdd, 0xc8, + 0x86, 0x08, 0xf0, 0x4f, 0x11, 0xbf, 0x26, 0x37, 0x0f, 0x21, 0x91, 0x1c, 0x16, 0x63, 0x8e, 0x71, + 0xd7, 0x10, 0xba, 0xc8, 0x61, 0x31, 0xae, 0xee, 0xcf, 0xb8, 0x47, 0x1e, 0x32, 0xdb, 0xed, 0xbb, + 0xed, 0xce, 0xfd, 0xb6, 0x1c, 0xe3, 0x93, 0xd0, 0x21, 0x72, 0x90, 0xf2, 0x5d, 0xa1, 0x08, 0xb9, + 0xa8, 0x13, 0x20, 0x28, 0xcd, 0x75, 0x7f, 0x1e, 0x32, 0x61, 0xc7, 0x67, 0x21, 0xe9, 0x75, 0x79, + 0x01, 0xb2, 0x91, 0xce, 0xbe, 0x0b, 0x69, 0xf7, 0xe8, 0xb7, 0x20, 0xc4, 0xea, 0x17, 0x12, 0x64, + 0x7d, 0xf1, 0xbc, 0x0d, 0x61, 0x9f, 0xfc, 0x11, 0x38, 0x5e, 0xf2, 0xc4, 0x7c, 0xc9, 0xff, 0x4c, + 0x41, 0x2e, 0x10, 0x23, 0xba, 0x08, 0xb9, 0xbe, 0x3d, 0xb5, 0x98, 0x66, 0x58, 0x4c, 0x94, 0x3c, + 0xb9, 0x19, 0xc3, 0x59, 0x01, 0x35, 0x2d, 0x86, 0x2e, 0x41, 0xde, 0x0d, 0x0f, 0xc7, 0xb6, 0xee, + 0xba, 0x95, 0xb4, 0x19, 0xc3, 0x20, 0xc0, 0x0d, 0x8e, 0x21, 0x19, 0x12, 0x74, 0x6a, 0x8a, 0x93, + 0x24, 0xcc, 0x87, 0xe8, 0x2c, 0xa4, 0x69, 0x7f, 0x44, 0x4c, 0x5d, 0x14, 0x77, 0x01, 0x7b, 0x33, + 0xf4, 0x3f, 0x28, 0x7d, 0x46, 0x1c, 0x5b, 0x63, 0x23, 0x87, 0xd0, 0x91, 0x3d, 0x1e, 0x88, 0x42, + 0x4b, 0xb8, 0xc8, 0xd1, 0x9e, 0x0f, 0xa2, 0xff, 0x7b, 0x69, 0x21, 0xaf, 0xb4, 0xe0, 0x25, 0xe1, + 0x02, 0xc7, 0xd7, 0x7d, 0x6e, 0x57, 0x40, 0x8e, 0xe4, 0xb9, 0x04, 0x33, 0x82, 0xa0, 0x84, 0x4b, + 0x41, 0xa6, 0x4b, 0xb2, 0x06, 0x25, 0x8b, 0xec, 0xe9, 0xcc, 0x78, 0x44, 0x34, 0x3a, 0xd1, 0x2d, + 0xaa, 0x64, 0x8f, 0xff, 0x0a, 0xa8, 0x4f, 0xfb, 0x0f, 0x09, 0xeb, 0x4e, 0x74, 0xcb, 0xeb, 0xd0, + 0xa2, 0xbf, 0x82, 0x63, 0x14, 0xbd, 0x03, 0xa7, 0x82, 0x2d, 0x06, 0x64, 0xcc, 0x74, 0xaa, 0xe4, + 0x96, 0x13, 0x2b, 0x08, 0x07, 0x3b, 0xdf, 0x16, 0xe8, 0x4c, 0xa2, 0xe0, 0x46, 0x15, 0x58, 0x4e, + 0xac, 0x48, 0x61, 0xa2, 0x20, 0xc6, 0xed, 0xad, 0x34, 0xb1, 0xa9, 0x11, 0x21, 0x95, 0x7f, 0x33, + 0x29, 0x7f, 0x45, 0x40, 0x2a, 0xd8, 0xc2, 0x23, 0x55, 0x70, 0x49, 0xf9, 0x70, 0x48, 0x2a, 0x48, + 0xf4, 0x48, 0x15, 0x5d, 0x52, 0x3e, 0xec, 0x91, 0xba, 0x05, 0xe0, 0x10, 0x4a, 0x98, 0x36, 0xe2, + 0x2f, 0x5f, 0x12, 0x26, 0x70, 0xf1, 0x04, 0x1b, 0x5b, 0xc5, 0x3c, 0x6b, 0xd3, 0xb0, 0x18, 0xce, + 0x39, 0xfe, 0x70, 0x4e, 0x7f, 0xa7, 0xe6, 0xf4, 0x87, 0x2e, 0x43, 0xb1, 0x3f, 0xa5, 0xcc, 0x36, + 0x35, 0x21, 0x59, 0xaa, 0xc8, 0x82, 0x47, 0xc1, 0x05, 0x77, 0x04, 0x56, 0xbd, 0x09, 0xb9, 0x60, + 0xff, 0xd9, 0xa6, 0xcf, 0x40, 0xe2, 0x41, 0xa3, 0x2b, 0x4b, 0x28, 0x0d, 0xf1, 0x76, 0x47, 0x8e, + 0x87, 0x8d, 0x9f, 0x58, 0x4a, 0x7e, 0xf9, 0xbd, 0x2a, 0xd5, 0x33, 0x90, 0x12, 0x37, 0xac, 0x17, + 0x00, 0x42, 0x81, 0x54, 0x6f, 0x01, 0x84, 0xaf, 0xc9, 0x35, 0x6a, 0x0f, 0x87, 0x94, 0xb8, 0xa2, + 0x5f, 0xc0, 0xde, 0x8c, 0xe3, 0x63, 0x62, 0xed, 0xb1, 0x91, 0xd0, 0x7a, 0x11, 0x7b, 0xb3, 0x2b, + 0x15, 0x80, 0xf0, 0x37, 0x38, 0x27, 0x51, 0xdb, 0x6a, 0xca, 0x31, 0x6e, 0x1d, 0x78, 0xfb, 0x5e, + 0x43, 0x96, 0xea, 0x1f, 0x1f, 0x3c, 0x57, 0x63, 0x4f, 0x9f, 0xab, 0xb1, 0x57, 0xcf, 0x55, 0xe9, + 0xf3, 0x23, 0x55, 0xfa, 0xe1, 0x48, 0x95, 0x9e, 0x1c, 0xa9, 0xd2, 0xc1, 0x91, 0x2a, 0xfd, 0x7a, + 0xa4, 0x4a, 0x2f, 0x8f, 0xd4, 0xd8, 0xab, 0x23, 0x55, 0xfa, 0xfa, 0x85, 0x1a, 0x3b, 0x78, 0xa1, + 0xc6, 0x9e, 0xbe, 0x50, 0x63, 0x9f, 0x04, 0x7f, 0x1e, 0x77, 0xd3, 0xe2, 0xdf, 0xe2, 0xf5, 0xbf, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb1, 0x8c, 0x07, 0xad, 0x5d, 0x0e, 0x00, 0x00, } func (x SourceEnum) String() string { @@ -1547,6 +1558,9 @@ func (this *WriteRequest) Equal(that interface{}) bool { if !this.MessageWithBufRef.Equal(that1.MessageWithBufRef) { return false } + if this.DiscardOutOfOrder != that1.DiscardOutOfOrder { + return false + } return true } func (this *WriteRequestV2) Equal(that interface{}) bool { @@ -2235,7 +2249,7 @@ func (this *WriteRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 10) s = append(s, "&cortexpb.WriteRequest{") s = append(s, "Timeseries: "+fmt.Sprintf("%#v", this.Timeseries)+",\n") s = append(s, "Source: "+fmt.Sprintf("%#v", this.Source)+",\n") @@ -2244,6 +2258,7 @@ func (this *WriteRequest) GoString() string { } s = append(s, "SkipLabelNameValidation: "+fmt.Sprintf("%#v", this.SkipLabelNameValidation)+",\n") s = append(s, "MessageWithBufRef: "+fmt.Sprintf("%#v", this.MessageWithBufRef)+",\n") + s = append(s, "DiscardOutOfOrder: "+fmt.Sprintf("%#v", this.DiscardOutOfOrder)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -2567,6 +2582,18 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.DiscardOutOfOrder { + i-- + if m.DiscardOutOfOrder { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x3e + i-- + dAtA[i] = 0xd0 + } { size := m.MessageWithBufRef.Size() i -= size @@ -3569,6 +3596,9 @@ func (m *WriteRequest) Size() (n int) { } l = m.MessageWithBufRef.Size() n += 2 + l + sovCortex(uint64(l)) + if m.DiscardOutOfOrder { + n += 3 + } return n } @@ -3995,6 +4025,7 @@ func (this *WriteRequest) String() string { `Metadata:` + repeatedStringForMetadata + `,`, `SkipLabelNameValidation:` + fmt.Sprintf("%v", this.SkipLabelNameValidation) + `,`, `MessageWithBufRef:` + fmt.Sprintf("%v", this.MessageWithBufRef) + `,`, + `DiscardOutOfOrder:` + fmt.Sprintf("%v", this.DiscardOutOfOrder) + `,`, `}`, }, "") return s @@ -4492,6 +4523,26 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 1002: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DiscardOutOfOrder", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.DiscardOutOfOrder = bool(v != 0) default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) diff --git a/pkg/cortexpb/cortex.proto b/pkg/cortexpb/cortex.proto index 8e27aa1ffa8..62885116953 100644 --- a/pkg/cortexpb/cortex.proto +++ b/pkg/cortexpb/cortex.proto @@ -24,6 +24,8 @@ message WriteRequest { bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false]; + // When true, indicates that out-of-order samples should be discarded even if OOO is enabled. + bool discard_out_of_order = 1002; } // refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index 4d780bba6a1..194c61b5287 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -103,6 +103,7 @@ func ReuseWriteRequest(req *PreallocWriteRequest) { req.Source = 0 req.Metadata = nil req.Timeseries = nil + req.DiscardOutOfOrder = false writeRequestPool.Put(req) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 188e6df98df..9d46a27fa1c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -977,7 +977,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s } } - return d.send(localCtx, ingester, timeseries, metadata, req.Source) + return d.send(localCtx, ingester, timeseries, metadata, req.Source, req.DiscardOutOfOrder) }, func() { cortexpb.ReuseSlice(req.Timeseries) req.Free() @@ -1230,7 +1230,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { }) } -func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum) error { +func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum, discardOutOfOrder bool) error { h, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return err @@ -1248,9 +1248,10 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time if d.cfg.UseStreamPush { req := &cortexpb.WriteRequest{ - Timeseries: timeseries, - Metadata: metadata, - Source: source, + Timeseries: timeseries, + Metadata: metadata, + Source: source, + DiscardOutOfOrder: discardOutOfOrder, } _, err = c.PushStreamConnection(ctx, req) } else { @@ -1258,6 +1259,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time req.Timeseries = timeseries req.Metadata = metadata req.Source = source + req.DiscardOutOfOrder = discardOutOfOrder _, err = c.PushPreAlloc(ctx, req) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d53dc35a573..23cadcfa7a6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1428,6 +1428,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Walk the samples, appending them to the users database app := db.Appender(ctx).(extendedAppender) + + // Even when OOO is enabled globally, we want to reject OOO samples in some cases. + // prometheus implementation: https://github.com/prometheus/prometheus/pull/14710 + if req.DiscardOutOfOrder { + app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + } + var newSeries []labels.Labels for _, ts := range req.Timeseries { diff --git a/pkg/ingester/ingester_ooo_test.go b/pkg/ingester/ingester_ooo_test.go new file mode 100644 index 00000000000..6b863d716e0 --- /dev/null +++ b/pkg/ingester/ingester_ooo_test.go @@ -0,0 +1,205 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" +) + +// mockAppender implements the extendedAppender interface for testing +type mockAppender struct { + storage.Appender + lastOptions *storage.AppendOptions +} + +func (m *mockAppender) SetOptions(opts *storage.AppendOptions) { + m.lastOptions = opts +} + +func TestIngester_Push_DiscardOutOfOrder_True(t *testing.T) { + req := &cortexpb.WriteRequest{ + Source: cortexpb.RULE, + DiscardOutOfOrder: true, + Timeseries: []cortexpb.PreallocTimeseries{}, + } + + assert.True(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should be true") + assert.True(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return true") +} + +func TestIngester_Push_DiscardOutOfOrder_Default(t *testing.T) { + // Create a WriteRequest without setting DiscardOutOfOrder + req := &cortexpb.WriteRequest{ + Source: cortexpb.API, + Timeseries: []cortexpb.PreallocTimeseries{}, + } + + // Verify the default value is false + assert.False(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should default to false") + assert.False(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return false by default") +} + +func TestIngester_WriteRequest_MultipleScenarios(t *testing.T) { + scenarios := []struct { + name string + setupReq func() *cortexpb.WriteRequest + expectOpts bool + description string + }{ + { + name: "Stale marker during rule migration", + setupReq: func() *cortexpb.WriteRequest { + return &cortexpb.WriteRequest{ + Source: cortexpb.RULE, + DiscardOutOfOrder: true, + } + }, + expectOpts: true, + description: "Should set appender options to discard OOO", + }, + { + name: "Normal rule evaluation", + setupReq: func() *cortexpb.WriteRequest { + return &cortexpb.WriteRequest{ + Source: cortexpb.RULE, + DiscardOutOfOrder: false, + } + }, + expectOpts: false, + description: "Should not set appender options", + }, + { + name: "API write request", + setupReq: func() *cortexpb.WriteRequest { + return &cortexpb.WriteRequest{ + Source: cortexpb.API, + DiscardOutOfOrder: false, + } + }, + expectOpts: false, + description: "API requests should never trigger OOO discard", + }, + { + name: "Default values", + setupReq: func() *cortexpb.WriteRequest { + return &cortexpb.WriteRequest{} + }, + expectOpts: false, + description: "Default values should not trigger OOO discard", + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + req := scenario.setupReq() + mock := &mockAppender{} + + // Simulate the ingester logic + if req.DiscardOutOfOrder { + mock.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + } + + // Verify expectations + if scenario.expectOpts { + require.NotNil(t, mock.lastOptions) + assert.True(t, mock.lastOptions.DiscardOutOfOrder) + } + }) + } +} + +func TestIngester_DiscardOutOfOrderFlagIngegrationTest(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + limits := defaultLimitsTestConfig() + limits.EnableNativeHistograms = true + limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute) + + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Create labels for our test metric + metricLabels := labels.FromStrings("__name__", "test_metric", "job", "test") + + currentTime := time.Now().UnixMilli() + olderTime := currentTime - 60000 // 1 minute earlier (within OOO window) + + // First, push a sample with current timestamp with discardOutOfOrder=true + req1 := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 100, TimestampMs: currentTime}}, + nil, nil, cortexpb.RULE) + req1.DiscardOutOfOrder = true + + _, err = i.Push(ctx, req1) + require.NoError(t, err, "First sample push should succeed") + + // Now try to push a sample with older timestamp with discardOutOfOrder=true + // This should be discarded because DiscardOutOfOrder is true + req2 := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 50, TimestampMs: olderTime}}, + nil, nil, cortexpb.RULE) + req2.DiscardOutOfOrder = true + + _, _ = i.Push(ctx, req2) + + // Query back the data to ensure only the first (current time) sample was stored + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: olderTime - 1000, + EndTimestampMs: currentTime + 1000, + Matchers: []*client.LabelMatcher{ + {Type: client.EQUAL, Name: "__name__", Value: "test_metric"}, + }, + }, s) + require.NoError(t, err) + + // Verify we only have one series with one sample (the current time sample) + require.Len(t, s.series, 1, "Should have exactly one series") + + // Convert chunks to samples to verify content + series := s.series[0] + require.Len(t, series.Chunks, 1, "Should have exactly one chunk") + + chunk := series.Chunks[0] + chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data) + require.NoError(t, err) + + iter := chunkData.Iterator(nil) + sampleCount := 0 + for iter.Next() != chunkenc.ValNone { + ts, val := iter.At() + require.Equal(t, currentTime, ts, "Sample timestamp should match current time") + require.Equal(t, 100.0, val, "Sample value should match first push") + sampleCount++ + } + require.NoError(t, iter.Err()) + require.Equal(t, 1, sampleCount, "Should have exactly one sample stored") +} diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 0dc5c0210eb..3a13151b4c6 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -49,6 +49,7 @@ type PusherAppender struct { histogramLabels []labels.Labels histograms []cortexpb.Histogram userID string + opts *storage.AppendOptions } func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { @@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v return 0, nil } -func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {} +func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) { + a.opts = opts +} func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { // AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only. @@ -94,6 +97,12 @@ func (a *PusherAppender) Commit() error { req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE) req.AddHistogramTimeSeries(a.histogramLabels, a.histograms) + + // Set DiscardOutOfOrder flag if requested via AppendOptions + if a.opts != nil && a.opts.DiscardOutOfOrder { + req.DiscardOutOfOrder = true + } + // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. // We shouldn't call client.ReuseSlice here. _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req) diff --git a/pkg/ruler/compat_ooo_test.go b/pkg/ruler/compat_ooo_test.go new file mode 100644 index 00000000000..eff1af676ae --- /dev/null +++ b/pkg/ruler/compat_ooo_test.go @@ -0,0 +1,93 @@ +package ruler + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +type mockPusher struct { + lastRequest *cortexpb.WriteRequest + pushError error +} + +func (m *mockPusher) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + m.lastRequest = req + return &cortexpb.WriteResponse{}, m.pushError +} + +func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) { + mock := &mockPusher{} + counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) + + appender := &PusherAppender{ + ctx: context.Background(), + pusher: mock, + userID: "test-user", + totalWrites: counter, + failedWrites: counter, + labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")}, + samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, + } + + appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + + err := appender.Commit() + require.NoError(t, err) + + // Verify that DiscardOutOfOrder was set in the WriteRequest + require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent") + assert.True(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest") +} + +func TestPusherAppender_Commit_WithoutDiscardOutOfOrder(t *testing.T) { + mock := &mockPusher{} + counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) + + appender := &PusherAppender{ + ctx: context.Background(), + pusher: mock, + userID: "test-user", + totalWrites: counter, + failedWrites: counter, + labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")}, + samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, + } + + appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: false}) + + err := appender.Commit() + require.NoError(t, err) + + require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent") + assert.False(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be false in WriteRequest") +} + +func TestPusherAppender_Commit_WithNilOptions(t *testing.T) { + mock := &mockPusher{} + counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) + + appender := &PusherAppender{ + ctx: context.Background(), + pusher: mock, + userID: "test-user", + totalWrites: counter, + failedWrites: counter, + labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")}, + samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, + opts: nil, // Explicitly nil + } + + err := appender.Commit() + require.NoError(t, err) + + require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent") + assert.False(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be false when opts is nil") +} From 0974b28a4963130f702320488de05ba7dbca056f Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Thu, 5 Mar 2026 20:01:12 +0000 Subject: [PATCH 2/3] fixup! adding discardOutOfOrder field in write request Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- pkg/distributor/distributor_test.go | 92 +++++++++++-- pkg/ingester/ingester_ooo_test.go | 205 ---------------------------- pkg/ingester/ingester_test.go | 80 +++++++++++ pkg/ruler/compat_ooo_test.go | 93 ------------- pkg/ruler/compat_test.go | 24 ++++ 6 files changed, 189 insertions(+), 307 deletions(-) delete mode 100644 pkg/ingester/ingester_ooo_test.go delete mode 100644 pkg/ruler/compat_ooo_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d8f6b89cf8a..797b85f0552 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,7 +43,7 @@ * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 * [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246 * [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253 -* [ENHANCEMENT] discard ooo samples in some special cases #7227 +* [ENHANCEMENT] Ruler/Ingester: Propagate append hints to discard out of order samples on Ingester #7226 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ff3039f53ff..6b60e48cdcb 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -400,6 +400,78 @@ func TestDistributor_Push(t *testing.T) { } } +func TestDistributor_Push_DiscardOutOfOrder(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "userDiscardOOO") + + tests := []struct { + name string + discardOutOfOrder bool + expectedDiscardOOO bool + useStreamPush bool + }{ + { + name: "DiscardOutOfOrder=true with regular push", + discardOutOfOrder: true, + expectedDiscardOOO: true, + useStreamPush: false, + }, + { + name: "DiscardOutOfOrder=false with regular push", + discardOutOfOrder: false, + expectedDiscardOOO: false, + useStreamPush: false, + }, + { + name: "DiscardOutOfOrder=true with stream push", + discardOutOfOrder: true, + expectedDiscardOOO: true, + useStreamPush: true, + }, + { + name: "DiscardOutOfOrder=false with stream push", + discardOutOfOrder: false, + expectedDiscardOOO: false, + useStreamPush: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + useStreamPush: tc.useStreamPush, + }) + + request := makeWriteRequest(123456789000, 5, 0, 0) + request.DiscardOutOfOrder = tc.discardOutOfOrder + + _, err := ds[0].Push(ctx, request) + require.NoError(t, err) + + // Verify all ingesters received the correct DiscardOutOfOrder flag + for _, ing := range ingesters { + ing.Lock() + lastDiscardOOO := ing.lastDiscardOutOfOrder + ing.Unlock() + + assert.Equal(t, tc.expectedDiscardOOO, lastDiscardOOO, + "ingester should have received DiscardOutOfOrder=%v", tc.expectedDiscardOOO) + } + }) + } +} + func TestDistributor_MetricsCleanup(t *testing.T) { t.Parallel() dists, _, regs, r := prepare(t, prepConfig{ @@ -3547,14 +3619,15 @@ type mockIngester struct { sync.Mutex client.IngesterClient grpc_health_v1.HealthClient - happy atomic.Bool - failResp atomic.Error - stats client.UsersStatsResponse - timeseries map[uint32]*cortexpb.PreallocTimeseries - metadata map[uint32]map[cortexpb.MetricMetadata]struct{} - queryDelay time.Duration - calls map[string]int - lblsValues []string + happy atomic.Bool + failResp atomic.Error + stats client.UsersStatsResponse + timeseries map[uint32]*cortexpb.PreallocTimeseries + metadata map[uint32]map[cortexpb.MetricMetadata]struct{} + queryDelay time.Duration + calls map[string]int + lblsValues []string + lastDiscardOutOfOrder bool } func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester { @@ -3625,6 +3698,9 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt i.trackCall("Push") + // Store the DiscardOutOfOrder flag for test assertions + i.lastDiscardOutOfOrder = req.DiscardOutOfOrder + if !i.happy.Load() { return nil, i.failResp.Load() } diff --git a/pkg/ingester/ingester_ooo_test.go b/pkg/ingester/ingester_ooo_test.go deleted file mode 100644 index 6b863d716e0..00000000000 --- a/pkg/ingester/ingester_ooo_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package ingester - -import ( - "context" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/weaveworks/common/user" - - "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util/services" - "github.com/cortexproject/cortex/pkg/util/test" -) - -// mockAppender implements the extendedAppender interface for testing -type mockAppender struct { - storage.Appender - lastOptions *storage.AppendOptions -} - -func (m *mockAppender) SetOptions(opts *storage.AppendOptions) { - m.lastOptions = opts -} - -func TestIngester_Push_DiscardOutOfOrder_True(t *testing.T) { - req := &cortexpb.WriteRequest{ - Source: cortexpb.RULE, - DiscardOutOfOrder: true, - Timeseries: []cortexpb.PreallocTimeseries{}, - } - - assert.True(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should be true") - assert.True(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return true") -} - -func TestIngester_Push_DiscardOutOfOrder_Default(t *testing.T) { - // Create a WriteRequest without setting DiscardOutOfOrder - req := &cortexpb.WriteRequest{ - Source: cortexpb.API, - Timeseries: []cortexpb.PreallocTimeseries{}, - } - - // Verify the default value is false - assert.False(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should default to false") - assert.False(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return false by default") -} - -func TestIngester_WriteRequest_MultipleScenarios(t *testing.T) { - scenarios := []struct { - name string - setupReq func() *cortexpb.WriteRequest - expectOpts bool - description string - }{ - { - name: "Stale marker during rule migration", - setupReq: func() *cortexpb.WriteRequest { - return &cortexpb.WriteRequest{ - Source: cortexpb.RULE, - DiscardOutOfOrder: true, - } - }, - expectOpts: true, - description: "Should set appender options to discard OOO", - }, - { - name: "Normal rule evaluation", - setupReq: func() *cortexpb.WriteRequest { - return &cortexpb.WriteRequest{ - Source: cortexpb.RULE, - DiscardOutOfOrder: false, - } - }, - expectOpts: false, - description: "Should not set appender options", - }, - { - name: "API write request", - setupReq: func() *cortexpb.WriteRequest { - return &cortexpb.WriteRequest{ - Source: cortexpb.API, - DiscardOutOfOrder: false, - } - }, - expectOpts: false, - description: "API requests should never trigger OOO discard", - }, - { - name: "Default values", - setupReq: func() *cortexpb.WriteRequest { - return &cortexpb.WriteRequest{} - }, - expectOpts: false, - description: "Default values should not trigger OOO discard", - }, - } - - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - req := scenario.setupReq() - mock := &mockAppender{} - - // Simulate the ingester logic - if req.DiscardOutOfOrder { - mock.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - } - - // Verify expectations - if scenario.expectOpts { - require.NotNil(t, mock.lastOptions) - assert.True(t, mock.lastOptions.DiscardOutOfOrder) - } - }) - } -} - -func TestIngester_DiscardOutOfOrderFlagIngegrationTest(t *testing.T) { - registry := prometheus.NewRegistry() - cfg := defaultIngesterTestConfig(t) - cfg.LifecyclerConfig.JoinAfter = 0 - - limits := defaultLimitsTestConfig() - limits.EnableNativeHistograms = true - limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute) - - i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - - // Wait until it's ACTIVE - test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { - return i.lifecycler.GetState() - }) - - ctx := user.InjectOrgID(context.Background(), "test-user") - - // Create labels for our test metric - metricLabels := labels.FromStrings("__name__", "test_metric", "job", "test") - - currentTime := time.Now().UnixMilli() - olderTime := currentTime - 60000 // 1 minute earlier (within OOO window) - - // First, push a sample with current timestamp with discardOutOfOrder=true - req1 := cortexpb.ToWriteRequest( - []labels.Labels{metricLabels}, - []cortexpb.Sample{{Value: 100, TimestampMs: currentTime}}, - nil, nil, cortexpb.RULE) - req1.DiscardOutOfOrder = true - - _, err = i.Push(ctx, req1) - require.NoError(t, err, "First sample push should succeed") - - // Now try to push a sample with older timestamp with discardOutOfOrder=true - // This should be discarded because DiscardOutOfOrder is true - req2 := cortexpb.ToWriteRequest( - []labels.Labels{metricLabels}, - []cortexpb.Sample{{Value: 50, TimestampMs: olderTime}}, - nil, nil, cortexpb.RULE) - req2.DiscardOutOfOrder = true - - _, _ = i.Push(ctx, req2) - - // Query back the data to ensure only the first (current time) sample was stored - s := &mockQueryStreamServer{ctx: ctx} - err = i.QueryStream(&client.QueryRequest{ - StartTimestampMs: olderTime - 1000, - EndTimestampMs: currentTime + 1000, - Matchers: []*client.LabelMatcher{ - {Type: client.EQUAL, Name: "__name__", Value: "test_metric"}, - }, - }, s) - require.NoError(t, err) - - // Verify we only have one series with one sample (the current time sample) - require.Len(t, s.series, 1, "Should have exactly one series") - - // Convert chunks to samples to verify content - series := s.series[0] - require.Len(t, series.Chunks, 1, "Should have exactly one chunk") - - chunk := series.Chunks[0] - chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data) - require.NoError(t, err) - - iter := chunkData.Iterator(nil) - sampleCount := 0 - for iter.Next() != chunkenc.ValNone { - ts, val := iter.At() - require.Equal(t, currentTime, ts, "Sample timestamp should match current time") - require.Equal(t, 100.0, val, "Sample value should match first push") - sampleCount++ - } - require.NoError(t, iter.Err()) - require.Equal(t, 1, sampleCount, "Should have exactly one sample stored") -} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index eb509e4e352..8f09aab9876 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -7863,3 +7863,83 @@ func TestIngester_checkRegexMatcherLimits(t *testing.T) { }) } } +func TestIngester_DiscardOutOfOrderFlagIntegration(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + limits := defaultLimitsTestConfig() + limits.EnableNativeHistograms = true + limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute) + + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() any { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Create labels for our test metric + metricLabels := labels.FromStrings(labels.MetricName, "test_metric", "job", "test") + + currentTime := time.Now().UnixMilli() + olderTime := currentTime - 60000 // 1 minute earlier (within OOO window) + + // First, push a sample with current timestamp with discardOutOfOrder=true + req1 := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 100, TimestampMs: currentTime}}, + nil, nil, cortexpb.RULE) + req1.DiscardOutOfOrder = true + + _, err = i.Push(ctx, req1) + require.NoError(t, err, "First sample push should succeed") + + // Now try to push a sample with older timestamp with discardOutOfOrder=true + // This should be discarded because DiscardOutOfOrder is true + req2 := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 50, TimestampMs: olderTime}}, + nil, nil, cortexpb.RULE) + req2.DiscardOutOfOrder = true + + _, _ = i.Push(ctx, req2) + + // Query back the data to ensure only the first (current time) sample was stored + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: olderTime - 1000, + EndTimestampMs: currentTime + 1000, + Matchers: []*client.LabelMatcher{ + {Type: client.EQUAL, Name: labels.MetricName, Value: "test_metric"}, + }, + }, s) + require.NoError(t, err) + + // Verify we only have one series with one sample (the current time sample) + require.Len(t, s.series, 1, "Should have exactly one series") + + // Convert chunks to samples to verify content + series := s.series[0] + require.Len(t, series.Chunks, 1, "Should have exactly one chunk") + + chunk := series.Chunks[0] + chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data) + require.NoError(t, err) + + iter := chunkData.Iterator(nil) + sampleCount := 0 + for iter.Next() != chunkenc.ValNone { + ts, val := iter.At() + require.Equal(t, currentTime, ts, "Sample timestamp should match current time") + require.Equal(t, 100.0, val, "Sample value should match first push") + sampleCount++ + } + require.NoError(t, iter.Err()) + require.Equal(t, 1, sampleCount, "Should have exactly one sample stored") +} diff --git a/pkg/ruler/compat_ooo_test.go b/pkg/ruler/compat_ooo_test.go deleted file mode 100644 index eff1af676ae..00000000000 --- a/pkg/ruler/compat_ooo_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package ruler - -import ( - "context" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/cortexpb" -) - -type mockPusher struct { - lastRequest *cortexpb.WriteRequest - pushError error -} - -func (m *mockPusher) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { - m.lastRequest = req - return &cortexpb.WriteResponse{}, m.pushError -} - -func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) { - mock := &mockPusher{} - counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) - - appender := &PusherAppender{ - ctx: context.Background(), - pusher: mock, - userID: "test-user", - totalWrites: counter, - failedWrites: counter, - labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")}, - samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, - } - - appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - - err := appender.Commit() - require.NoError(t, err) - - // Verify that DiscardOutOfOrder was set in the WriteRequest - require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent") - assert.True(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest") -} - -func TestPusherAppender_Commit_WithoutDiscardOutOfOrder(t *testing.T) { - mock := &mockPusher{} - counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) - - appender := &PusherAppender{ - ctx: context.Background(), - pusher: mock, - userID: "test-user", - totalWrites: counter, - failedWrites: counter, - labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")}, - samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, - } - - appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: false}) - - err := appender.Commit() - require.NoError(t, err) - - require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent") - assert.False(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be false in WriteRequest") -} - -func TestPusherAppender_Commit_WithNilOptions(t *testing.T) { - mock := &mockPusher{} - counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) - - appender := &PusherAppender{ - ctx: context.Background(), - pusher: mock, - userID: "test-user", - totalWrites: counter, - failedWrites: counter, - labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")}, - samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, - opts: nil, // Explicitly nil - } - - err := appender.Commit() - require.NoError(t, err) - - require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent") - assert.False(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be false when opts is nil") -} diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 19d45062e74..1d0c5d2a8b4 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -413,3 +414,26 @@ func TestRecordAndReportRuleQueryMetrics(t *testing.T) { require.Equal(t, testutil.ToFloat64(metrics.RulerQueryChunkBytes.WithLabelValues("userID")), float64(10)) require.Equal(t, testutil.ToFloat64(metrics.RulerQueryDataBytes.WithLabelValues("userID")), float64(14)) } +func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) { + pusher := &fakePusher{response: &cortexpb.WriteResponse{}} + counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) + + appender := &PusherAppender{ + ctx: context.Background(), + pusher: pusher, + userID: "test-user", + totalWrites: counter, + failedWrites: counter, + labels: []labels.Labels{labels.FromStrings(labels.MetricName, "test_metric")}, + samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, + } + + appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + + err := appender.Commit() + require.NoError(t, err) + + // Verify that DiscardOutOfOrder was set in the WriteRequest + require.NotNil(t, pusher.request, "WriteRequest should have been sent") + require.True(t, pusher.request.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest") +} From b18bdd4790db1bb392707ae466ea6979f564b55a Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Thu, 5 Mar 2026 21:06:30 +0000 Subject: [PATCH 3/3] fixup! fixup! adding discardOutOfOrder field in write request Signed-off-by: Shvejan Mutheboyina --- pkg/distributor/distributor_test.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index f8557cae265..2b3b91b0fe3 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -489,7 +489,28 @@ func TestDistributor_Push_DiscardOutOfOrder(t *testing.T) { _, err := ds[0].Push(ctx, request) require.NoError(t, err) - // Verify all ingesters received the correct DiscardOutOfOrder flag + // Poll to ensure all ingesters have received the push before verifying. + test.Poll(t, time.Second, nil, func() any { + for _, ing := range ingesters { + ing.Lock() + pushCalls := ing.calls["Push"] + lastDiscardOOO := ing.lastDiscardOutOfOrder + ing.Unlock() + + // Wait for all ingesters to receive the push call + if pushCalls == 0 { + return fmt.Errorf("ingester has not received push yet") + } + + // Wait for the DiscardOutOfOrder flag to match expected value + if lastDiscardOOO != tc.expectedDiscardOOO { + return fmt.Errorf("ingester has DiscardOutOfOrder=%v, expected %v", lastDiscardOOO, tc.expectedDiscardOOO) + } + } + return nil + }) + + // Final assertion: verify all ingesters received the correct DiscardOutOfOrder flag for _, ing := range ingesters { ing.Lock() lastDiscardOOO := ing.lastDiscardOutOfOrder