[core][flink] Add option to skip expired partitions in compaction job#7537
[core][flink] Add option to skip expired partitions in compaction job#7537lilei1128 wants to merge 7 commits into
Conversation
813f97f to
30880be
Compare
|
I have a quick solution: see |
Thank you for your reply, sql hints can indeed dynamically pass in parameters, but for compact jobs(like the issue), it seems that parameters cannot be passed in through hints. |
JingsongLi
left a comment
There was a problem hiding this comment.
Useful optimization. Skipping compaction for already-expired partitions avoids wasted work.
Review:
-
Config:
compaction.skip-expired-partitions(default: false) — conservative default is correct. Users opt in explicitly. -
Implementation: Filtering in
CompactorSourceBuilder.build()using the existingPartitionExpireStrategy— reuses the partition expiration evaluation logic. Clean. -
Scope: Only applies when
partition.expiration-strategy=values-time. Does it also work withupdate-timestrategy? If not, document the limitation. -
Edge case: A partition that is "about to expire" might still receive late-arriving data that resets its expiration time. If we skip compaction, that late data won't be compacted until the partition is no longer expired. Is this acceptable?
-
+199/-8 with tests. Tests cover both enabled/disabled cases. Good.
-
Interaction with partition expiration: If the compaction job skips expired partitions, but the expiration job hasn't run yet, the data files remain uncompacted until they're deleted. This is fine — no point compacting data that will be deleted soon.
LGTM.
|
Please rebase master. |
30880be to
3f050df
Compare
Thanks a lot for taking your time to review. I have rebased this PR onto the latest master branch. |
|
Hi @JingsongLi, I have added documentation for this feature. Could you please take a look when you have time? |
| }); | ||
| dataStream = new DataStreamSource<>(filterStream); | ||
| } | ||
| CoreOptions coreOptions = table.coreOptions(); |
There was a problem hiding this comment.
This filter only removes expired partitions from the compaction source, but the compactor sink still uses the table with write-only=false, so table.newCommit(...) will create and run PartitionExpire during the compact commit. As a result, a job that is supposed to skip expired partitions can still delete those partitions as a side effect of compacting the remaining active partitions. Could we either disable partition expiration for this compaction commit path, or document that enabling this option may also expire the skipped partitions during the commit?
There was a problem hiding this comment.
Thanks for the review!
You're right that the compactor sink still runs PartitionExpire during commit. However, we intentionally kept this behavior rather than disabling partition expiration in the commit path.
The reason: in the typical deployment pattern where write jobs use write-only=true, the compaction job's commit is the only path that triggers PartitionExpire. If we disable it here too, expired partitions would never be cleaned up automatically — users would have to manually run ExpirePartitionsAction to delete them. That feels worse than the current side effect.
So the goal of this option is specifically to avoid unnecessary compaction IO on partitions that are about to be deleted (no file merging, no writing compacted files that get immediately dropped).
I have updated the documentation (both CoreOptions.java and dedicated-compaction.mdx) to explicitly note that expired partitions may still be deleted during the commit phase as a side effect, so users are aware of this behavior.
b34235f to
9d51155
Compare
9d51155 to
2476f3a
Compare
Purpose
Add a new option
compaction.skip-expired-partitions(default: false)that allows the Flink compaction job to skip partitions already expired
under the
values-timepartition expiration strategy, avoidingunnecessary compaction work on soon-to-be-deleted dataPlease continue.
Close #7531
Tests