Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [BUGFIX] Compactor: Fix compaction concurrency limit not being respected across compaction levels. When `compaction-concurrency` was set to 1, multiple compactions (e.g., 12h and 24h) could still run in the same pass due to the BucketCompactor loop calling `Groups()` repeatedly. The grouper now tracks cumulative groups returned and enforces the limit across calls. #7298
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
Expand Down
125 changes: 125 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/oklog/ulid/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -2449,3 +2450,127 @@ func TestCompactor_UserIndexUpdateLoop(t *testing.T) {
}
}
}

func TestCompactor_ShouldRespectConcurrencyLimitAcrossCompactionLevels(t *testing.T) {
block0hto2hUlid := ulid.MustNew(301, nil)
block2hto4hUlid := ulid.MustNew(302, nil)
block4hto6hUlid := ulid.MustNew(303, nil)
block6hto8hUlid := ulid.MustNew(304, nil)
block8hto10hUlid := ulid.MustNew(305, nil)
block10hto12hUlid := ulid.MustNew(306, nil)
block24hto36hUlid := ulid.MustNew(307, nil)
block36hto48hUlid := ulid.MustNew(308, nil)

h := time.Hour.Milliseconds()

blocks := map[ulid.ULID]*metadata.Meta{
block0hto2hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block0hto2hUlid, MinTime: 0, MaxTime: 2 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block2hto4hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block2hto4hUlid, MinTime: 2 * h, MaxTime: 4 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block4hto6hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block4hto6hUlid, MinTime: 4 * h, MaxTime: 6 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block6hto8hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block6hto8hUlid, MinTime: 6 * h, MaxTime: 8 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block8hto10hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block8hto10hUlid, MinTime: 8 * h, MaxTime: 10 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block10hto12hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block10hto12hUlid, MinTime: 10 * h, MaxTime: 12 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block24hto36hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block24hto36hUlid, MinTime: 24 * h, MaxTime: 36 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
block36hto48hUlid: {
BlockMeta: tsdb.BlockMeta{ULID: block36hto48hUlid, MinTime: 36 * h, MaxTime: 48 * h},
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
},
}

compactorCfg := &Config{
BlockRanges: cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
}

limits := &validation.Limits{}
overrides := validation.NewOverrides(*limits, nil)

rs := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Addr: "test-addr"},
},
}
subring := &ring.RingMock{}
subring.On("GetAllHealthy", mock.Anything).Return(rs, nil)

r := &ring.RingMock{}
r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil)

registerer := prometheus.NewPedanticRegistry()
blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_visit_marker_read_failed",
})
blockVisitMarkerWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_visit_marker_write_failed",
})

bkt := &bucket.ClientMock{}
bkt.MockUpload(mock.Anything, nil)
bkt.MockGet(mock.Anything, "", nil)

metrics := newCompactorMetrics(registerer)

noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark {
return nil
}

ctx := t.Context()
g := NewShuffleShardingGrouper(
ctx,
nil,
objstore.WithNoopInstr(bkt),
false,
true,
nil,
metadata.NoneFunc,
metrics.getSyncerMetrics("test-user"),
metrics,
*compactorCfg,
r,
"test-addr",
"test-compactor",
overrides,
"test-user",
10,
3,
1, // concurrency = 1
5*time.Minute,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
noCompactFilter,
)

totalGroupsAcrossCalls := 0
for i := 0; i < 5; i++ {
result, err := g.Groups(blocks)
require.NoError(t, err)
totalGroupsAcrossCalls += len(result)
if len(result) == 0 {
break
}
}

require.Equal(t, 1, totalGroupsAcrossCalls,
"with concurrency=1, the grouper should return at most 1 group total across multiple Groups() calls, "+
"but got %d (bug: BucketCompactor loop causes cascading compactions at different levels)", totalGroupsAcrossCalls)
}
13 changes: 10 additions & 3 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type PartitionCompactionGrouper struct {
blockFilesConcurrency int
blocksFetchConcurrency int
compactionConcurrency int
totalGroupsPlanned int

doRandomPick bool

Expand Down Expand Up @@ -114,6 +115,11 @@ func NewPartitionCompactionGrouper(

// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
remainingConcurrency := g.compactionConcurrency - g.totalGroupsPlanned
if remainingConcurrency <= 0 {
return nil, nil
}

// Check if this compactor is on the subring.
// If the compactor is not on the subring when using the userID as a identifier
// no plans generated below will be owned by the compactor so we can just return an empty array
Expand All @@ -140,7 +146,8 @@ func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta)
return nil, errors.Wrap(err, "unable to generate compaction jobs")
}

pickedPartitionCompactionJobs := g.pickPartitionCompactionJob(partitionCompactionJobs)
pickedPartitionCompactionJobs := g.pickPartitionCompactionJob(partitionCompactionJobs, remainingConcurrency)
g.totalGroupsPlanned += len(pickedPartitionCompactionJobs)

return pickedPartitionCompactionJobs, nil
}
Expand Down Expand Up @@ -622,7 +629,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo *
return nil
}

func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompactionJobs []*blocksGroupWithPartition) []*compact.Group {
func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompactionJobs []*blocksGroupWithPartition, remainingConcurrency int) []*compact.Group {
var outGroups []*compact.Group
for _, partitionedGroup := range partitionCompactionJobs {
groupHash := partitionedGroup.groupHash
Expand Down Expand Up @@ -697,7 +704,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact

outGroups = append(outGroups, thanosGroup)
level.Debug(partitionedGroupLogger).Log("msg", "added partition to compaction groups")
if len(outGroups) >= g.compactionConcurrency {
if len(outGroups) >= remainingConcurrency {
break
}
}
Expand Down
90 changes: 90 additions & 0 deletions pkg/compactor/partition_compaction_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2253,3 +2253,93 @@ type expectedCompactionJob struct {
rangeStart int64
rangeEnd int64
}

func TestPartitionCompactionGrouper_ShouldRespectCumulativeConcurrencyLimit(t *testing.T) {
block1 := ulid.MustNew(201, nil)
block2 := ulid.MustNew(202, nil)
block3 := ulid.MustNew(203, nil)
block4 := ulid.MustNew(204, nil)

userID := "test-user"
testCompactorID := "test-compactor"

blocks := map[ulid.ULID]*metadata.Meta{
block1: {
BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
},
block2: {
BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
},
block3: {
BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
},
block4: {
BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
},
}

compactorCfg := &Config{
BlockRanges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
}

limits := &validation.Limits{}
overrides := validation.NewOverrides(*limits, nil)

rs := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Addr: "test-addr"},
},
}
subring := &ring.RingMock{}
subring.On("GetAllHealthy", mock.Anything).Return(rs, nil)
subring.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rs, nil)

r := &ring.RingMock{}
r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil)

registerer := prometheus.NewPedanticRegistry()
metrics := newCompactorMetrics(registerer)

noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark {
return nil
}

bkt := &bucket.ClientMock{}
bkt.MockUpload(mock.Anything, nil)
bkt.MockGet(mock.Anything, "", nil)
bkt.MockIter(mock.Anything, nil, nil)

ctx := t.Context()
g := NewPartitionCompactionGrouper(
ctx,
nil,
objstore.WithNoopInstr(bkt),
false,
true,
nil,
metrics.getSyncerMetrics(userID),
metrics,
metadata.NoneFunc,
*compactorCfg,
r,
"test-addr",
testCompactorID,
overrides,
userID,
10,
3,
1, // concurrency = 1
false,
5*time.Minute,
noCompactFilter,
1,
)

result1, err := g.Groups(blocks)
require.NoError(t, err)
require.Len(t, result1, 1, "first Groups() call should return exactly 1 group")

result2, err := g.Groups(blocks)
require.NoError(t, err)
require.Len(t, result2, 0, "second Groups() call should return 0 groups when cumulative concurrency limit is reached")
}
9 changes: 8 additions & 1 deletion pkg/compactor/shuffle_sharding_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ShuffleShardingGrouper struct {
blockFilesConcurrency int
blocksFetchConcurrency int
compactionConcurrency int
totalGroupsPlanned int

ring ring.ReadRing
ringLifecyclerAddr string
Expand Down Expand Up @@ -106,6 +107,11 @@ func NewShuffleShardingGrouper(

// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
remainingConcurrency := g.compactionConcurrency - g.totalGroupsPlanned
if remainingConcurrency <= 0 {
return nil, nil
}

noCompactMarked := g.noCompBlocksFunc()
// First of all we have to group blocks using the Thanos default
// grouping (based on downsample resolution + external labels).
Expand Down Expand Up @@ -248,11 +254,12 @@ mainLoop:
}

outGroups = append(outGroups, thanosGroup)
if len(outGroups) == g.compactionConcurrency {
if len(outGroups) == remainingConcurrency {
break mainLoop
}
}

g.totalGroupsPlanned += len(outGroups)
level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups)))

return outGroups, nil
Expand Down
Loading