Skip to content

Commit 28ad1ac

Browse files
authored
querier: support series batching from Store Gateways (#7203)
* querier: support series batching from Store Gateways Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix lint Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add flag to experimental features Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * thanos dump to 49dde505913b6b838ca0cd77bb63dd50b8b6fdba Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * changelog change Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * port batchable server to parget SG Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add BenchmarkParquetBucketStore_SeriesBatch Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 5ca23ba commit 28ad1ac

18 files changed

Lines changed: 668 additions & 36 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
1414
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
1515
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
16+
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
1617
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
1718
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
1819
* [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ querier:
237237
# CLI flag: -querier.store-gateway-consistency-check-max-attempts
238238
[store_gateway_consistency_check_max_attempts: <int> | default = 3]
239239

240+
# [Experimental] The maximum number of series to be batched in a single gRPC
241+
# response message from Store Gateways. A value of 0 or 1 disables batching.
242+
# CLI flag: -querier.store-gateway-series-batch-size
243+
[store_gateway_series_batch_size: <int> | default = 1]
244+
240245
# The maximum number of times we attempt fetching data from ingesters for
241246
# retryable errors (ex. partial data returned).
242247
# CLI flag: -querier.ingester-query-max-attempts

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4829,6 +4829,11 @@ store_gateway_client:
48294829
# CLI flag: -querier.store-gateway-consistency-check-max-attempts
48304830
[store_gateway_consistency_check_max_attempts: <int> | default = 3]
48314831

4832+
# [Experimental] The maximum number of series to be batched in a single gRPC
4833+
# response message from Store Gateways. A value of 0 or 1 disables batching.
4834+
# CLI flag: -querier.store-gateway-series-batch-size
4835+
[store_gateway_series_batch_size: <int> | default = 1]
4836+
48324837
# The maximum number of times we attempt fetching data from ingesters for
48334838
# retryable errors (ex. partial data returned).
48344839
# CLI flag: -querier.ingester-query-max-attempts

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ Currently experimental features are:
124124
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
125125
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
126126
- Querier: protobuf codec (`-api.querier-default-codec`)
127+
- Querier: Series batch size (`-querier.store-gateway-series-batch-size`)
127128
- Query-frontend: dynamic query splits
128129
- `querier.max-shards-per-query` (int) CLI flag
129130
- `querier.max-fetched-data-duration-per-query` (duration) CLI flag

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ require (
5151
github.com/stretchr/testify v1.11.1
5252
github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488
5353
github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5
54-
github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008
54+
github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b
5555
github.com/uber/jaeger-client-go v2.30.0+incompatible
5656
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5
5757
go.etcd.io/etcd/api/v3 v3.5.17

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1789,8 +1789,8 @@ github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488 h1:khBsQLLRoF1K
17891789
github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0=
17901790
github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 h1:hIg9M9TRha/qaLDdtwsTWsTDkewGHleVZaV2JsLY1vA=
17911791
github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc=
1792-
github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008 h1:+msnhVSAWx7AewLJllGEELJmhGciT5BmLoq6rUt4Aq4=
1793-
github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008/go.mod h1:B9TgiYdhZdVxB1jXi4hRV+XDhiMmhHFykb8cxsZyWG8=
1792+
github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b h1:KIQzAcxtdxi3PhrOpGP5t/TP7NBZqYvvcUvlu0q8fEQ=
1793+
github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b/go.mod h1:B9TgiYdhZdVxB1jXi4hRV+XDhiMmhHFykb8cxsZyWG8=
17941794
github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww=
17951795
github.com/tinylib/msgp v1.3.0/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
17961796
github.com/tjhop/slog-gokit v0.1.4 h1:uj/vbDt3HaF0Py8bHPV4ti/s0utnO0miRbO277FLBKM=

pkg/querier/blocks_store_queryable.go

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ type BlocksStoreQueryable struct {
143143

144144
storeGatewayQueryStatsEnabled bool
145145
storeGatewayConsistencyCheckMaxAttempts int
146+
storeGatewaySeriesBatchSize int64
146147

147148
// Subservices manager.
148149
subservices *services.Manager
@@ -175,6 +176,7 @@ func NewBlocksStoreQueryable(
175176
limits: limits,
176177
storeGatewayQueryStatsEnabled: config.StoreGatewayQueryStatsEnabled,
177178
storeGatewayConsistencyCheckMaxAttempts: config.StoreGatewayConsistencyCheckMaxAttempts,
179+
storeGatewaySeriesBatchSize: config.StoreGatewaySeriesBatchSize,
178180
}
179181

180182
q.Service = services.NewBasicService(q.starting, q.running, q.stopping)
@@ -306,6 +308,7 @@ func (q *BlocksStoreQueryable) Querier(mint, maxt int64) (storage.Querier, error
306308
queryStoreAfter: q.queryStoreAfter,
307309
storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled,
308310
storeGatewayConsistencyCheckMaxAttempts: q.storeGatewayConsistencyCheckMaxAttempts,
311+
storeGatewaySeriesBatchSize: q.storeGatewaySeriesBatchSize,
309312
}, nil
310313
}
311314

@@ -328,6 +331,9 @@ type blocksStoreQuerier struct {
328331

329332
// The maximum number of times we attempt fetching missing blocks from different Store Gateways.
330333
storeGatewayConsistencyCheckMaxAttempts int
334+
335+
// The maximum number of series to be batched in a single gRPC response message from Store Gateways.
336+
storeGatewaySeriesBatchSize int64
331337
}
332338

333339
// Select implements storage.Querier interface.
@@ -648,7 +654,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
648654
seriesQueryStats := &hintspb.QueryStats{}
649655
skipChunks := sp != nil && sp.Func == "series"
650656

651-
req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, sp, shardingInfo, skipChunks, blockIDs, defaultAggrs)
657+
req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, sp, shardingInfo, skipChunks, blockIDs, defaultAggrs, q.storeGatewaySeriesBatchSize)
652658
if err != nil {
653659
return errors.Wrapf(err, "failed to create series request")
654660
}
@@ -670,6 +676,37 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
670676
myWarnings := annotations.Annotations(nil)
671677
myQueriedBlocks := []ulid.ULID(nil)
672678

679+
processSeries := func(s *storepb.Series) error {
680+
mySeries = append(mySeries, s)
681+
682+
// Add series fingerprint to query limiter; will return error if we are over the limit
683+
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
684+
if limitErr != nil {
685+
return validation.LimitError(limitErr.Error())
686+
}
687+
688+
// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
689+
if maxChunksLimit > 0 {
690+
actual := numChunks.Add(int32(len(s.Chunks)))
691+
if actual > int32(leftChunksLimit) {
692+
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
693+
}
694+
}
695+
chunksSize := countChunkBytes(s)
696+
dataSize := countDataBytes(s)
697+
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
698+
return validation.LimitError(chunkBytesLimitErr.Error())
699+
}
700+
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
701+
return validation.LimitError(chunkLimitErr.Error())
702+
}
703+
if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil {
704+
return validation.LimitError(dataBytesLimitErr.Error())
705+
}
706+
707+
return nil
708+
}
709+
673710
for {
674711
// Ensure the context hasn't been canceled in the meanwhile (eg. an error occurred
675712
// in another goroutine).
@@ -708,34 +745,19 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
708745
return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress())
709746
}
710747

711-
// Response may either contain series, warning or hints.
748+
// Response may either contain series, batch, warning or hints.
712749
if s := resp.GetSeries(); s != nil {
713-
mySeries = append(mySeries, s)
714-
715-
// Add series fingerprint to query limiter; will return error if we are over the limit
716-
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
717-
if limitErr != nil {
718-
return validation.LimitError(limitErr.Error())
750+
if err := processSeries(s); err != nil {
751+
return err
719752
}
753+
}
720754

721-
// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
722-
if maxChunksLimit > 0 {
723-
actual := numChunks.Add(int32(len(s.Chunks)))
724-
if actual > int32(leftChunksLimit) {
725-
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
755+
if b := resp.GetBatch(); b != nil {
756+
for _, s := range b.Series {
757+
if err := processSeries(s); err != nil {
758+
return err
726759
}
727760
}
728-
chunksSize := countChunkBytes(s)
729-
dataSize := countDataBytes(s)
730-
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
731-
return validation.LimitError(chunkBytesLimitErr.Error())
732-
}
733-
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
734-
return validation.LimitError(chunkLimitErr.Error())
735-
}
736-
if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil {
737-
return validation.LimitError(dataBytesLimitErr.Error())
738-
}
739761
}
740762

741763
if w := resp.GetWarning(); w != "" {
@@ -1044,7 +1066,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
10441066
return valueSets, warnings, queriedBlocks, nil, merr.Err()
10451067
}
10461068

1047-
func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, selectHints *storage.SelectHints, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) {
1069+
func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, selectHints *storage.SelectHints, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr, batchSize int64) (*storepb.SeriesRequest, error) {
10481070
// Selectively query only specific blocks.
10491071
hints := &hintspb.SeriesRequestHints{
10501072
BlockMatchers: []storepb.LabelMatcher{
@@ -1074,6 +1096,7 @@ func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatche
10741096
// TODO: support more downsample levels when downsampling is supported.
10751097
Aggregates: aggrs,
10761098
MaxResolutionWindow: downsample.ResLevel0,
1099+
ResponseBatchSize: batchSize,
10771100
}
10781101

10791102
if selectHints != nil {

pkg/querier/blocks_store_queryable_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
8989
}
9090

9191
tests := map[string]struct {
92+
seriesBatchSize int64
9293
finderResult bucketindex.Blocks
9394
finderErr error
9495
storeSetResponses []any
@@ -1581,6 +1582,33 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
15811582
},
15821583
},
15831584
},
1585+
"a single store-gateway instance returns a batch of series": {
1586+
seriesBatchSize: 2,
1587+
finderResult: bucketindex.Blocks{
1588+
&bucketindex.Block{ID: block1},
1589+
},
1590+
storeSetResponses: []any{
1591+
map[BlocksStoreClient][]ulid.ULID{
1592+
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
1593+
mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series1Label.Name, series1Label.Value), []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil),
1594+
mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series2Label.Name, series2Label.Value), []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil),
1595+
mockHintsResponse(block1),
1596+
}}: {block1},
1597+
},
1598+
},
1599+
limits: &blocksStoreLimitsMock{},
1600+
queryLimiter: noOpQueryLimiter,
1601+
expectedSeries: []seriesResult{
1602+
{
1603+
lbls: labels.New(metricNameLabel, series1Label),
1604+
values: []valueResult{{t: minT, v: 1}},
1605+
},
1606+
{
1607+
lbls: labels.New(metricNameLabel, series2Label),
1608+
values: []valueResult{{t: minT, v: 2}},
1609+
},
1610+
},
1611+
},
15841612
}
15851613

15861614
for testName, testData := range tests {
@@ -1622,6 +1650,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
16221650
limits: testData.limits,
16231651

16241652
storeGatewayConsistencyCheckMaxAttempts: 3,
1653+
storeGatewaySeriesBatchSize: testData.seriesBatchSize,
16251654
}
16261655

16271656
matchers := []*labels.Matcher{
@@ -1697,6 +1726,22 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
16971726
require.True(t, found)
16981727
}
16991728

1729+
if testData.seriesBatchSize > 0 {
1730+
found := false
1731+
for _, resp := range testData.storeSetResponses {
1732+
if clientsMap, ok := resp.(map[BlocksStoreClient][]ulid.ULID); ok {
1733+
for client := range clientsMap {
1734+
if mockClient, ok := client.(*storeGatewayClientMock); ok {
1735+
// verify if SG get passed seriesBatchSize
1736+
assert.Equal(t, testData.seriesBatchSize, mockClient.lastSeriesRequest.ResponseBatchSize)
1737+
found = true
1738+
}
1739+
}
1740+
}
1741+
}
1742+
require.True(t, found)
1743+
}
1744+
17001745
// Assert on metrics (optional, only for test cases defining it).
17011746
if testData.expectedMetrics != "" {
17021747
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics)))

pkg/querier/querier.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ type Config struct {
8181
// The maximum number of times we attempt fetching missing blocks from different Store Gateways.
8282
StoreGatewayConsistencyCheckMaxAttempts int `yaml:"store_gateway_consistency_check_max_attempts"`
8383

84+
// The maximum number of series to be batched in a single gRPC response message from Store Gateways.
85+
StoreGatewaySeriesBatchSize int64 `yaml:"store_gateway_series_batch_size"`
86+
8487
// The maximum number of times we attempt fetching data from Ingesters.
8588
IngesterQueryMaxAttempts int `yaml:"ingester_query_max_attempts"`
8689

@@ -109,6 +112,7 @@ var (
109112
errEmptyTimeRange = errors.New("empty time range")
110113
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)")
111114
errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1")
115+
errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0")
112116
errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1")
113117
errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet")
114118
)
@@ -142,6 +146,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
142146
f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
143147
f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.")
144148
f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier")
149+
f.Int64Var(&cfg.StoreGatewaySeriesBatchSize, "querier.store-gateway-series-batch-size", 1, "[Experimental] The maximum number of series to be batched in a single gRPC response message from Store Gateways. A value of 0 or 1 disables batching.")
145150
f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).")
146151
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
147152
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
@@ -179,6 +184,10 @@ func (cfg *Config) Validate() error {
179184
return errInvalidConsistencyCheckAttempts
180185
}
181186

187+
if cfg.StoreGatewaySeriesBatchSize < 0 {
188+
return errInvalidSeriesBatchSize
189+
}
190+
182191
if cfg.IngesterQueryMaxAttempts < 1 {
183192
return errInvalidIngesterQueryMaxAttempts
184193
}

pkg/querier/querier_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1666,6 +1666,18 @@ func TestConfig_Validate(t *testing.T) {
16661666
},
16671667
expected: errInvalidParquetQueryableDefaultBlockStore,
16681668
},
1669+
"should if if invalid series batch size": {
1670+
setup: func(cfg *Config) {
1671+
cfg.StoreGatewaySeriesBatchSize = -1
1672+
},
1673+
expected: errInvalidSeriesBatchSize,
1674+
},
1675+
"should pass when 0 series batch size": {
1676+
setup: func(cfg *Config) {
1677+
cfg.StoreGatewaySeriesBatchSize = 0
1678+
},
1679+
expected: nil,
1680+
},
16691681
}
16701682

16711683
for testName, testData := range tests {

0 commit comments

Comments
 (0)