From 352b8db2b5d9f8cfd1e03294bfd3b12e113be4aa Mon Sep 17 00:00:00 2001 From: Alexandru Adam Date: Sat, 28 Feb 2026 11:46:12 +0200 Subject: [PATCH] Fix compactor concurrency limit not enforced across compaction levels Signed-off-by: Alexandru Adam --- CHANGELOG.md | 1 + pkg/compactor/compactor_test.go | 125 ++++++++++++++++++ pkg/compactor/partition_compaction_grouper.go | 13 +- .../partition_compaction_grouper_test.go | 90 +++++++++++++ pkg/compactor/shuffle_sharding_grouper.go | 9 +- .../shuffle_sharding_grouper_test.go | 98 ++++++++++++++ 6 files changed, 332 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61eee8a099e..b6c1ea7bd59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [BUGFIX] Compactor: Fix compaction concurrency limit not being respected across compaction levels. When `compaction-concurrency` was set to 1, multiple compactions (e.g., 12h and 24h) could still run in the same pass due to the BucketCompactor loop calling `Groups()` repeatedly. The grouper now tracks cumulative groups returned and enforces the limit across calls. #7298 * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 * [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718 * [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727 diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 5724c946990..67367334ef7 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -22,6 +22,7 @@ import ( "github.com/oklog/ulid/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" @@ -2449,3 +2450,127 @@ func TestCompactor_UserIndexUpdateLoop(t *testing.T) { } } } + +func TestCompactor_ShouldRespectConcurrencyLimitAcrossCompactionLevels(t *testing.T) { + block0hto2hUlid := ulid.MustNew(301, nil) + block2hto4hUlid := ulid.MustNew(302, nil) + block4hto6hUlid := ulid.MustNew(303, nil) + block6hto8hUlid := ulid.MustNew(304, nil) + block8hto10hUlid := ulid.MustNew(305, nil) + block10hto12hUlid := ulid.MustNew(306, nil) + block24hto36hUlid := ulid.MustNew(307, nil) + block36hto48hUlid := ulid.MustNew(308, nil) + + h := time.Hour.Milliseconds() + + blocks := map[ulid.ULID]*metadata.Meta{ + block0hto2hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block0hto2hUlid, MinTime: 0, MaxTime: 2 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block2hto4hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block2hto4hUlid, MinTime: 2 * h, MaxTime: 4 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block4hto6hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block4hto6hUlid, MinTime: 4 * h, MaxTime: 6 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block6hto8hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block6hto8hUlid, MinTime: 6 * h, MaxTime: 8 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block8hto10hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block8hto10hUlid, MinTime: 8 * h, MaxTime: 10 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block10hto12hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block10hto12hUlid, MinTime: 10 * h, MaxTime: 12 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block24hto36hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block24hto36hUlid, MinTime: 24 * h, MaxTime: 36 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block36hto48hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block36hto48hUlid, MinTime: 36 * h, MaxTime: 48 * h}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + } + + compactorCfg := &Config{ + BlockRanges: cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, + } + + limits := &validation.Limits{} + overrides := validation.NewOverrides(*limits, nil) + + rs := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "test-addr"}, + }, + } + subring := &ring.RingMock{} + subring.On("GetAllHealthy", mock.Anything).Return(rs, nil) + + r := &ring.RingMock{} + r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil) + + registerer := prometheus.NewPedanticRegistry() + blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_read_failed", + }) + blockVisitMarkerWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_write_failed", + }) + + bkt := &bucket.ClientMock{} + bkt.MockUpload(mock.Anything, nil) + bkt.MockGet(mock.Anything, "", nil) + + metrics := newCompactorMetrics(registerer) + + noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark { + return nil + } + + ctx := t.Context() + g := NewShuffleShardingGrouper( + ctx, + nil, + objstore.WithNoopInstr(bkt), + false, + true, + nil, + metadata.NoneFunc, + metrics.getSyncerMetrics("test-user"), + metrics, + *compactorCfg, + r, + "test-addr", + "test-compactor", + overrides, + "test-user", + 10, + 3, + 1, // concurrency = 1 + 5*time.Minute, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed, + noCompactFilter, + ) + + totalGroupsAcrossCalls := 0 + for i := 0; i < 5; i++ { + result, err := g.Groups(blocks) + require.NoError(t, err) + totalGroupsAcrossCalls += len(result) + if len(result) == 0 { + break + } + } + + require.Equal(t, 1, totalGroupsAcrossCalls, + "with concurrency=1, the grouper should return at most 1 group total across multiple Groups() calls, "+ + "but got %d (bug: BucketCompactor loop causes cascading compactions at different levels)", totalGroupsAcrossCalls) +} diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 2d308fb6360..293b1d2afec 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -45,6 +45,7 @@ type PartitionCompactionGrouper struct { blockFilesConcurrency int blocksFetchConcurrency int compactionConcurrency int + totalGroupsPlanned int doRandomPick bool @@ -114,6 +115,11 @@ func NewPartitionCompactionGrouper( // Groups function modified from https://github.com/cortexproject/cortex/pull/2616 func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + remainingConcurrency := g.compactionConcurrency - g.totalGroupsPlanned + if remainingConcurrency <= 0 { + return nil, nil + } + // Check if this compactor is on the subring. // If the compactor is not on the subring when using the userID as a identifier // no plans generated below will be owned by the compactor so we can just return an empty array @@ -140,7 +146,8 @@ func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) return nil, errors.Wrap(err, "unable to generate compaction jobs") } - pickedPartitionCompactionJobs := g.pickPartitionCompactionJob(partitionCompactionJobs) + pickedPartitionCompactionJobs := g.pickPartitionCompactionJob(partitionCompactionJobs, remainingConcurrency) + g.totalGroupsPlanned += len(pickedPartitionCompactionJobs) return pickedPartitionCompactionJobs, nil } @@ -622,7 +629,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * return nil } -func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompactionJobs []*blocksGroupWithPartition) []*compact.Group { +func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompactionJobs []*blocksGroupWithPartition, remainingConcurrency int) []*compact.Group { var outGroups []*compact.Group for _, partitionedGroup := range partitionCompactionJobs { groupHash := partitionedGroup.groupHash @@ -697,7 +704,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact outGroups = append(outGroups, thanosGroup) level.Debug(partitionedGroupLogger).Log("msg", "added partition to compaction groups") - if len(outGroups) >= g.compactionConcurrency { + if len(outGroups) >= remainingConcurrency { break } } diff --git a/pkg/compactor/partition_compaction_grouper_test.go b/pkg/compactor/partition_compaction_grouper_test.go index 6ca1ee88779..1965694b180 100644 --- a/pkg/compactor/partition_compaction_grouper_test.go +++ b/pkg/compactor/partition_compaction_grouper_test.go @@ -2253,3 +2253,93 @@ type expectedCompactionJob struct { rangeStart int64 rangeEnd int64 } + +func TestPartitionCompactionGrouper_ShouldRespectCumulativeConcurrencyLimit(t *testing.T) { + block1 := ulid.MustNew(201, nil) + block2 := ulid.MustNew(202, nil) + block3 := ulid.MustNew(203, nil) + block4 := ulid.MustNew(204, nil) + + userID := "test-user" + testCompactorID := "test-compactor" + + blocks := map[ulid.ULID]*metadata.Meta{ + block1: { + BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + block2: { + BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + block3: { + BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + block4: { + BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + } + + compactorCfg := &Config{ + BlockRanges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, + } + + limits := &validation.Limits{} + overrides := validation.NewOverrides(*limits, nil) + + rs := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "test-addr"}, + }, + } + subring := &ring.RingMock{} + subring.On("GetAllHealthy", mock.Anything).Return(rs, nil) + subring.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rs, nil) + + r := &ring.RingMock{} + r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil) + + registerer := prometheus.NewPedanticRegistry() + metrics := newCompactorMetrics(registerer) + + noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark { + return nil + } + + bkt := &bucket.ClientMock{} + bkt.MockUpload(mock.Anything, nil) + bkt.MockGet(mock.Anything, "", nil) + bkt.MockIter(mock.Anything, nil, nil) + + ctx := t.Context() + g := NewPartitionCompactionGrouper( + ctx, + nil, + objstore.WithNoopInstr(bkt), + false, + true, + nil, + metrics.getSyncerMetrics(userID), + metrics, + metadata.NoneFunc, + *compactorCfg, + r, + "test-addr", + testCompactorID, + overrides, + userID, + 10, + 3, + 1, // concurrency = 1 + false, + 5*time.Minute, + noCompactFilter, + 1, + ) + + result1, err := g.Groups(blocks) + require.NoError(t, err) + require.Len(t, result1, 1, "first Groups() call should return exactly 1 group") + + result2, err := g.Groups(blocks) + require.NoError(t, err) + require.Len(t, result2, 0, "second Groups() call should return 0 groups when cumulative concurrency limit is reached") +} diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index d0b3492ceb5..761f16c7f09 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -38,6 +38,7 @@ type ShuffleShardingGrouper struct { blockFilesConcurrency int blocksFetchConcurrency int compactionConcurrency int + totalGroupsPlanned int ring ring.ReadRing ringLifecyclerAddr string @@ -106,6 +107,11 @@ func NewShuffleShardingGrouper( // Groups function modified from https://github.com/cortexproject/cortex/pull/2616 func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + remainingConcurrency := g.compactionConcurrency - g.totalGroupsPlanned + if remainingConcurrency <= 0 { + return nil, nil + } + noCompactMarked := g.noCompBlocksFunc() // First of all we have to group blocks using the Thanos default // grouping (based on downsample resolution + external labels). @@ -248,11 +254,12 @@ mainLoop: } outGroups = append(outGroups, thanosGroup) - if len(outGroups) == g.compactionConcurrency { + if len(outGroups) == remainingConcurrency { break mainLoop } } + g.totalGroupsPlanned += len(outGroups) level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups))) return outGroups, nil diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index 3ff91003755..bf24041ab9b 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -418,6 +418,104 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { } } +func TestShuffleShardingGrouper_ShouldRespectCumulativeConcurrencyLimit(t *testing.T) { + block0hto1hUlid := ulid.MustNew(100, nil) + block1hto2hUlid := ulid.MustNew(101, nil) + block2hto3hUlid := ulid.MustNew(102, nil) + block3hto4hUlid := ulid.MustNew(103, nil) + + blocks := map[ulid.ULID]*metadata.Meta{ + block0hto1hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block0hto1hUlid, MinTime: 0, MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block1hto2hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block1hto2hUlid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block2hto3hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block2hto3hUlid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block3hto4hUlid: { + BlockMeta: tsdb.BlockMeta{ULID: block3hto4hUlid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + } + + compactorCfg := &Config{ + BlockRanges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + } + + limits := &validation.Limits{} + overrides := validation.NewOverrides(*limits, nil) + + rs := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "test-addr"}, + }, + } + subring := &ring.RingMock{} + subring.On("GetAllHealthy", mock.Anything).Return(rs, nil) + + r := &ring.RingMock{} + r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil) + + registerer := prometheus.NewPedanticRegistry() + blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_read_failed", + Help: "Number of block visit marker file failed to be read.", + }) + blockVisitMarkerWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_write_failed", + Help: "Number of block visit marker file failed to be written.", + }) + + bkt := &bucket.ClientMock{} + bkt.MockUpload(mock.Anything, nil) + bkt.MockGet(mock.Anything, "", nil) + + metrics := newCompactorMetrics(registerer) + + noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark { + return nil + } + + ctx := t.Context() + g := NewShuffleShardingGrouper( + ctx, + nil, + objstore.WithNoopInstr(bkt), + false, + true, + nil, + metadata.NoneFunc, + metrics.getSyncerMetrics("test-user"), + metrics, + *compactorCfg, + r, + "test-addr", + "test-compactor", + overrides, + "test-user", + 10, + 3, + 1, // concurrency = 1 + 5*time.Minute, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed, + noCompactFilter, + ) + + result1, err := g.Groups(blocks) + require.NoError(t, err) + require.Len(t, result1, 1, "first Groups() call should return exactly 1 group") + + result2, err := g.Groups(blocks) + require.NoError(t, err) + require.Len(t, result2, 0, "second Groups() call should return 0 groups when cumulative concurrency limit is reached") +} + func TestGroupBlocksByCompactableRanges(t *testing.T) { tests := map[string]struct { ranges []int64