Skip to content

[core][flink] Add option to skip expired partitions in compaction job#7537

Open
lilei1128 wants to merge 7 commits into
apache:masterfrom
lilei1128:compact_optimize
Open

[core][flink] Add option to skip expired partitions in compaction job#7537
lilei1128 wants to merge 7 commits into
apache:masterfrom
lilei1128:compact_optimize

Conversation

@lilei1128

Copy link
Copy Markdown
Contributor

Purpose

Add a new option compaction.skip-expired-partitions (default: false)
that allows the Flink compaction job to skip partitions already expired
under the values-time partition expiration strategy, avoiding
unnecessary compaction work on soon-to-be-deleted dataPlease continue.

  • Add CoreOptions.COMPACTION_SKIP_EXPIRED_PARTITIONS config option
  • Filter expired partitions in CompactorSourceBuilder.build() when the option enabled.

Close #7531

Tests

  • Add testSkipExpiredPartitions, testNotSkipExpiredPartitionsByDefault

@yuzelin

yuzelin commented Apr 2, 2026

Copy link
Copy Markdown
Contributor

I have a quick solution: see AbstractFileStore#newPartitionExpire, you can use sql hint or other way to pass dynamic option like

dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), null);
newTable = table.copy(dynamicOptions)
-- use newTable to build job

@lilei1128

Copy link
Copy Markdown
Contributor Author

I have a quick solution: see AbstractFileStore#newPartitionExpire, you can use sql hint or other way to pass dynamic option like

dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), null);
newTable = table.copy(dynamicOptions)
-- use newTable to build job

Thank you for your reply, sql hints can indeed dynamically pass in parameters, but for compact jobs(like the issue), it seems that parameters cannot be passed in through hints.

@JingsongLi JingsongLi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful optimization. Skipping compaction for already-expired partitions avoids wasted work.

Review:

  1. Config: compaction.skip-expired-partitions (default: false) — conservative default is correct. Users opt in explicitly.

  2. Implementation: Filtering in CompactorSourceBuilder.build() using the existing PartitionExpireStrategy — reuses the partition expiration evaluation logic. Clean.

  3. Scope: Only applies when partition.expiration-strategy=values-time. Does it also work with update-time strategy? If not, document the limitation.

  4. Edge case: A partition that is "about to expire" might still receive late-arriving data that resets its expiration time. If we skip compaction, that late data won't be compacted until the partition is no longer expired. Is this acceptable?

  5. +199/-8 with tests. Tests cover both enabled/disabled cases. Good.

  6. Interaction with partition expiration: If the compaction job skips expired partitions, but the expiration job hasn't run yet, the data files remain uncompacted until they're deleted. This is fine — no point compacting data that will be deleted soon.

LGTM.

@JingsongLi

Copy link
Copy Markdown
Contributor

Please rebase master.

@lilei1128

lilei1128 commented May 24, 2026

Copy link
Copy Markdown
Contributor Author

Please rebase master.

Thanks a lot for taking your time to review. I have rebased this PR onto the latest master branch.
Currently only values-time strategy is supported and update-time will be added in a follow-up PR(Including documentation instructions) to keep this change focused.

@lilei1128

Copy link
Copy Markdown
Contributor Author

Hi @JingsongLi, I have added documentation for this feature. Could you please take a look when you have time?

});
dataStream = new DataStreamSource<>(filterStream);
}
CoreOptions coreOptions = table.coreOptions();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter only removes expired partitions from the compaction source, but the compactor sink still uses the table with write-only=false, so table.newCommit(...) will create and run PartitionExpire during the compact commit. As a result, a job that is supposed to skip expired partitions can still delete those partitions as a side effect of compacting the remaining active partitions. Could we either disable partition expiration for this compaction commit path, or document that enabling this option may also expire the skipped partitions during the commit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!
You're right that the compactor sink still runs PartitionExpire during commit. However, we intentionally kept this behavior rather than disabling partition expiration in the commit path.

The reason: in the typical deployment pattern where write jobs use write-only=true, the compaction job's commit is the only path that triggers PartitionExpire. If we disable it here too, expired partitions would never be cleaned up automatically — users would have to manually run ExpirePartitionsAction to delete them. That feels worse than the current side effect.

So the goal of this option is specifically to avoid unnecessary compaction IO on partitions that are about to be deleted (no file merging, no writing compacted files that get immediately dropped).

I have updated the documentation (both CoreOptions.java and dedicated-compaction.mdx) to explicitly note that expired partitions may still be deleted during the commit phase as a side effect, so users are aware of this behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] add option to skip expired partitions in compaction job

3 participants