From 5e53cdc06b0398fa56fd585a7d4c94a7455e989e Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Mon, 25 May 2026 11:21:09 +0800 Subject: [PATCH 1/2] [core] Minor refactor partition predicates in FallbackReadScan --- .../paimon/table/ChainGroupReadTable.java | 12 ++--- .../table/FallbackReadFileStoreTable.java | 48 ++++++++++++++----- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 6267f3eb04d8..915f46cd8e15 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -236,7 +236,6 @@ public ChainTableBatchScan withBucketFilter(Filter bucketFilter) { @Override public Plan plan() { List splits = new ArrayList<>(); - PartitionPredicate partitionPredicate = getPartitionPredicate(); PredicateBuilder builder = new PredicateBuilder(tableSchema.logicalPartitionType()); for (Split split : mainScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; @@ -256,11 +255,11 @@ public Plan plan() { Set snapshotPartitions = new HashSet<>( - newChainPartitionListingScan(true, partitionPredicate) + newChainPartitionListingScan(true, getMainPartitionPredicate()) .listPartitions()); DataTableScan deltaPartitionScan = - newChainPartitionListingScan(false, partitionPredicate); + newChainPartitionListingScan(false, getFallbackPartitionPredicate()); List deltaPartitions = deltaPartitionScan.listPartitions().stream() .filter(p -> !snapshotPartitions.contains(p)) @@ -433,9 +432,10 @@ public Plan plan() { @Override public List listPartitionEntries() { - PartitionPredicate partitionPredicate = getPartitionPredicate(); - DataTableScan snapshotScan = newChainPartitionListingScan(true, partitionPredicate); - DataTableScan deltaScan = newChainPartitionListingScan(false, partitionPredicate); + DataTableScan snapshotScan = + newChainPartitionListingScan(true, getMainPartitionPredicate()); + DataTableScan deltaScan = + newChainPartitionListingScan(false, getFallbackPartitionPredicate()); List partitionEntries = new ArrayList<>(snapshotScan.listPartitionEntries()); Set partitions = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 75353983bb2b..7b0c29d19414 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -369,7 +369,8 @@ public static class FallbackReadScan implements DataTableScan { protected final Function scanCreator; protected final DataTableScan mainScan; protected final DataTableScan fallbackScan; - private PartitionPredicate partitionPredicate; + private PartitionPredicate mainPartitionPredicate; + private PartitionPredicate fallbackPartitionPredicate; public FallbackReadScan( FileStoreTable wrappedTable, @@ -475,6 +476,15 @@ public FallbackReadScan withPartitionFilter(Predicate partitionPredicate) { return this; } + public InnerTableScan withPartitionFilter( + PartitionPredicate mainPartitionPredicate, + PartitionPredicate fallbackPartitionPredicate) { + mainScan.withPartitionFilter(mainPartitionPredicate); + fallbackScan.withPartitionFilter(fallbackPartitionPredicate); + setPartitionPredicate(mainPartitionPredicate, fallbackPartitionPredicate); + return this; + } + @Override public FallbackReadScan withBucketFilter(Filter bucketFilter) { mainScan.withBucketFilter(bucketFilter); @@ -521,15 +531,14 @@ public InnerTableScan dropStats() { public TableScan.Plan plan() { List splits = new ArrayList<>(); Set completePartitions = - new HashSet<>( - newPartitionListingScan(true, partitionPredicate).listPartitions()); + new HashSet<>(newPartitionListingScan(true).listPartitions()); for (Split split : mainScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; splits.add(toFallbackSplit(dataSplit, false)); } List remainingPartitions = - newPartitionListingScan(false, partitionPredicate).listPartitions().stream() + newPartitionListingScan(false).listPartitions().stream() .filter(p -> !completePartitions.contains(p)) .collect(Collectors.toList()); if (!remainingPartitions.isEmpty()) { @@ -543,8 +552,8 @@ public TableScan.Plan plan() { @Override public List listPartitionEntries() { - DataTableScan mainListingScan = newPartitionListingScan(true, partitionPredicate); - DataTableScan fallbackListingScan = newPartitionListingScan(false, partitionPredicate); + DataTableScan mainListingScan = newPartitionListingScan(true); + DataTableScan fallbackListingScan = newPartitionListingScan(false); List partitionEntries = new ArrayList<>(mainListingScan.listPartitionEntries()); Set partitions = @@ -560,18 +569,31 @@ public List listPartitionEntries() { } protected void setPartitionPredicate(PartitionPredicate predicate) { - this.partitionPredicate = predicate; + this.mainPartitionPredicate = predicate; + this.fallbackPartitionPredicate = predicate; + } + + protected void setPartitionPredicate( + PartitionPredicate mainPartitionPredicate, + PartitionPredicate fallbackPartitionPredicate) { + this.mainPartitionPredicate = mainPartitionPredicate; + this.fallbackPartitionPredicate = fallbackPartitionPredicate; + } + + protected PartitionPredicate getMainPartitionPredicate() { + return mainPartitionPredicate; } - protected PartitionPredicate getPartitionPredicate() { - return partitionPredicate; + protected PartitionPredicate getFallbackPartitionPredicate() { + return fallbackPartitionPredicate; } - private DataTableScan newPartitionListingScan( - boolean isMain, PartitionPredicate scanPartitionPredicate) { + private DataTableScan newPartitionListingScan(boolean isMain) { DataTableScan scan = scanCreator.apply(isMain ? wrappedTable : fallbackTable); - if (scanPartitionPredicate != null) { - scan.withPartitionFilter(scanPartitionPredicate); + if (isMain && getMainPartitionPredicate() != null) { + scan.withPartitionFilter(getMainPartitionPredicate()); + } else if (getFallbackPartitionPredicate() != null) { + scan.withPartitionFilter(getFallbackPartitionPredicate()); } return scan; } From fdfc3126407359fe43ebf9faea01acc0a8df778d Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Mon, 25 May 2026 19:51:02 +0800 Subject: [PATCH 2/2] [core] Add test for separate partition predicates in FallbackReadScan --- .../table/FallbackReadFileStoreTable.java | 2 +- .../table/FallbackReadFileStoreTableTest.java | 69 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 7b0c29d19414..5a1a5b5a89c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -592,7 +592,7 @@ private DataTableScan newPartitionListingScan(boolean isMain) { DataTableScan scan = scanCreator.apply(isMain ? wrappedTable : fallbackTable); if (isMain && getMainPartitionPredicate() != null) { scan.withPartitionFilter(getMainPartitionPredicate()); - } else if (getFallbackPartitionPredicate() != null) { + } else if (!isMain && getFallbackPartitionPredicate() != null) { scan.withPartitionFilter(getFallbackPartitionPredicate()); } return scan; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index 483ef861eb20..e0841df4d0e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; @@ -27,8 +28,10 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -53,9 +56,11 @@ import org.mockito.Mockito; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData; @@ -333,6 +338,70 @@ public org.apache.paimon.reader.RecordReader createReader(Split spl }; } + /** + * Test that FallbackReadScan uses separate partition predicates for main and fallback scans. + * When withPartitionFilter(mainPredicate, fallbackPredicate) is called, plan() should only list + * partitions matching the corresponding predicate from each branch. + */ + @Test + public void testMainAndFallbackPartitionPredicates() throws Exception { + FileStoreTable mainTable = createTable(); + writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20)); + + mainTable.createBranch("bc"); + FileStoreTable branchTable = createTableFromBranch(mainTable, "bc"); + writeDataIntoTable( + branchTable, 0, rowData(1, 100), rowData(2, 200), rowData(3, 300), rowData(4, 400)); + + FallbackReadFileStoreTable table = + new FallbackReadFileStoreTable(mainTable, branchTable, true); + + RowType partitionType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"pt"}); + PartitionPredicate mainPredicate = + PartitionPredicate.fromMultiple( + partitionType, Collections.singletonList(BinaryRow.singleColumn(1))); + PartitionPredicate fallbackPredicate = + PartitionPredicate.fromMultiple( + partitionType, Collections.singletonList(BinaryRow.singleColumn(3))); + + // Case 1: both predicates set, pt=1 from main, pt=3 from fallback + assertThat( + readAndCollect( + table, + scan -> scan.withPartitionFilter(mainPredicate, fallbackPredicate))) + .containsExactlyInAnyOrder(Pair.of(1, 10), Pair.of(3, 300)); + + // Case 2: main predicate is null, fallback predicate set + assertThat(readAndCollect(table, scan -> scan.withPartitionFilter(null, fallbackPredicate))) + .containsExactlyInAnyOrder(Pair.of(1, 10), Pair.of(2, 20), Pair.of(3, 300)); + + // Case 3: main predicate set, fallback predicate is null + assertThat(readAndCollect(table, scan -> scan.withPartitionFilter(mainPredicate, null))) + .containsExactlyInAnyOrder( + Pair.of(1, 10), Pair.of(2, 200), Pair.of(3, 300), Pair.of(4, 400)); + + // Case 4: both null + assertThat(readAndCollect(table, scan -> scan.withPartitionFilter(null, null))) + .containsExactlyInAnyOrder( + Pair.of(1, 10), Pair.of(2, 20), Pair.of(3, 300), Pair.of(4, 400)); + } + + private List> readAndCollect( + FallbackReadFileStoreTable table, + Consumer consumer) + throws Exception { + FallbackReadFileStoreTable.FallbackReadScan scan = + (FallbackReadFileStoreTable.FallbackReadScan) table.newScan(); + consumer.accept(scan); + List> result = new ArrayList<>(); + for (Split split : scan.plan().splits()) { + RecordReader reader = table.newRead().createReader(split); + reader.forEachRemaining(r -> result.add(Pair.of(r.getInt(0), r.getInt(1)))); + reader.close(); + } + return result; + } + @Test void testSwitchToBranch() throws Exception { String branchName = "bc";