Skip to content

Commit 857f63e

Browse files
author
rockyyin
committed
feat(paimon-core): Add test cases for TableScan partition, bucket, level filters and list APIs
1 parent be172f4 commit 857f63e

1 file changed

Lines changed: 326 additions & 0 deletions

File tree

paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.paimon.table.source;
2020

21+
import org.apache.paimon.manifest.PartitionEntry;
22+
import org.apache.paimon.options.Options;
2123
import org.apache.paimon.predicate.FieldRef;
2224
import org.apache.paimon.predicate.Predicate;
2325
import org.apache.paimon.predicate.PredicateBuilder;
@@ -36,7 +38,9 @@
3638

3739
import java.util.Arrays;
3840
import java.util.Collections;
41+
import java.util.HashMap;
3942
import java.util.List;
43+
import java.util.Map;
4044

4145
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
4246
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
@@ -539,4 +543,326 @@ public void testPushDownTopNOnlyNull() throws Exception {
539543
assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), field, evolutions))
540544
.isNull();
541545
}
546+
547+
@Test
548+
public void testPartitionFilter() throws Exception {
549+
// Test partition filter functionality
550+
StreamTableWrite write = table.newWrite(commitUser);
551+
StreamTableCommit commit = table.newCommit(commitUser);
552+
553+
// Write data to multiple partitions
554+
write.write(rowData(1, 10, 100L)); // partition pt=1
555+
write.write(rowData(1, 20, 200L));
556+
commit.commit(0, write.prepareCommit(true, 0));
557+
558+
write.write(rowData(2, 30, 300L)); // partition pt=2
559+
write.write(rowData(2, 40, 400L));
560+
commit.commit(1, write.prepareCommit(true, 1));
561+
562+
write.write(rowData(3, 50, 500L)); // partition pt=3
563+
commit.commit(2, write.prepareCommit(true, 2));
564+
565+
// Without partition filter - should return all data
566+
TableScan.Plan planAll = table.newScan().plan();
567+
List<String> resultAll = getResult(table.newRead(), planAll.splits());
568+
assertThat(resultAll.size()).isEqualTo(5);
569+
570+
// Specify partition filter using Map
571+
Map<String, String> partitionSpec = new HashMap<>();
572+
partitionSpec.put("pt", "1");
573+
TableScan.Plan plan1 = table.newScan().withPartitionFilter(partitionSpec).plan();
574+
List<String> result1 = getResult(table.newRead(), plan1.splits());
575+
assertThat(result1.size()).isEqualTo(2);
576+
assertThat(result1).allMatch(s -> s.contains("1|"));
577+
578+
// Specify partition filter using BinaryRow
579+
TableScan.Plan plan2 =
580+
table.newScan().withPartitionFilter(Collections.singletonList(binaryRow(2))).plan();
581+
List<String> result2 = getResult(table.newRead(), plan2.splits());
582+
assertThat(result2.size()).isEqualTo(2);
583+
assertThat(result2).allMatch(s -> s.contains("2|"));
584+
585+
write.close();
586+
commit.close();
587+
}
588+
589+
@Test
590+
public void testBucketFilter() throws Exception {
591+
createAppendOnlyTable();
592+
593+
// Create table with multiple buckets
594+
Options conf = new Options();
595+
conf.set(org.apache.paimon.CoreOptions.BUCKET, 3);
596+
conf.set(org.apache.paimon.CoreOptions.BUCKET_KEY, "a");
597+
table = createFileStoreTable(false, conf, tablePath);
598+
599+
StreamTableWrite write = table.newWrite(commitUser);
600+
StreamTableCommit commit = table.newCommit(commitUser);
601+
602+
// Write data to different buckets
603+
for (int i = 0; i < 10; i++) {
604+
write.write(rowData(1, i, (long) i * 100));
605+
commit.commit(i, write.prepareCommit(true, i));
606+
}
607+
608+
// Without bucket filter - should return all data
609+
TableScan.Plan planAll = table.newScan().plan();
610+
assertThat(planAll.splits().size()).isEqualTo(10);
611+
612+
// Use bucket filter - only return data from specified bucket
613+
TableScan.Plan planBucket0 = table.newScan().withBucket(0).plan();
614+
assertThat(planBucket0.splits()).allMatch(split -> ((DataSplit) split).bucket() == 0);
615+
616+
// Use bucketFilter - filter out specific buckets
617+
TableScan.Plan planBucketFilter =
618+
table.newScan().withBucketFilter(bucket -> bucket == 1 || bucket == 2).plan();
619+
assertThat(planBucketFilter.splits())
620+
.allMatch(
621+
split -> {
622+
int bucket = ((DataSplit) split).bucket();
623+
return bucket == 1 || bucket == 2;
624+
});
625+
626+
write.close();
627+
commit.close();
628+
}
629+
630+
@Test
631+
public void testLevelFilter() throws Exception {
632+
// Test level filter for primary key table
633+
StreamTableWrite write = table.newWrite(commitUser);
634+
StreamTableCommit commit = table.newCommit(commitUser);
635+
636+
// Write data to trigger compaction and produce files at different levels
637+
for (int i = 0; i < 10; i++) {
638+
write.write(rowData(1, i, (long) i * 100));
639+
commit.commit(i, write.prepareCommit(true, i));
640+
}
641+
642+
// Without level filter
643+
TableScan.Plan planAll = table.newScan().plan();
644+
assertThat(planAll.splits().size()).isGreaterThan(0);
645+
646+
// Use level filter - only return level 0 data
647+
TableScan.Plan planLevel0 = table.newScan().withLevelFilter(level -> level == 0).plan();
648+
for (Split split : planLevel0.splits()) {
649+
DataSplit dataSplit = (DataSplit) split;
650+
assertThat(dataSplit.dataFiles()).allMatch(file -> file.level() == 0);
651+
}
652+
653+
write.close();
654+
commit.close();
655+
}
656+
657+
@Test
658+
public void testListPartitionEntries() throws Exception {
659+
StreamTableWrite write = table.newWrite(commitUser);
660+
StreamTableCommit commit = table.newCommit(commitUser);
661+
662+
// Write data to multiple partitions
663+
write.write(rowData(1, 10, 100L));
664+
commit.commit(0, write.prepareCommit(true, 0));
665+
666+
write.write(rowData(2, 20, 200L));
667+
commit.commit(1, write.prepareCommit(true, 1));
668+
669+
write.write(rowData(3, 30, 300L));
670+
commit.commit(2, write.prepareCommit(true, 2));
671+
672+
// Test listPartitionEntries
673+
List<PartitionEntry> partitionEntries = table.newScan().listPartitionEntries();
674+
assertThat(partitionEntries.size()).isEqualTo(3);
675+
676+
// Verify partition values
677+
List<Integer> partitionValues =
678+
partitionEntries.stream()
679+
.map(entry -> entry.partition().getInt(0))
680+
.sorted()
681+
.collect(java.util.stream.Collectors.toList());
682+
assertThat(partitionValues).containsExactly(1, 2, 3);
683+
684+
// Test listPartitions (convenience method)
685+
List<org.apache.paimon.data.BinaryRow> partitions = table.newScan().listPartitions();
686+
assertThat(partitions.size()).isEqualTo(3);
687+
688+
write.close();
689+
commit.close();
690+
}
691+
692+
@Test
693+
public void testPrimaryKeyTableScan() throws Exception {
694+
// Use existing primary key table (default table is primary key table)
695+
StreamTableWrite write = table.newWrite(commitUser);
696+
StreamTableCommit commit = table.newCommit(commitUser);
697+
698+
// Write data
699+
write.write(rowData(1, 10, 100L));
700+
write.write(rowData(1, 20, 200L));
701+
commit.commit(0, write.prepareCommit(true, 0));
702+
703+
// Update data (primary key is pt, a)
704+
write.write(rowData(1, 10, 101L)); // Update data for (1, 10)
705+
commit.commit(1, write.prepareCommit(true, 1));
706+
707+
// Verify scan result - should only have the latest values
708+
TableScan.Plan plan = table.newScan().plan();
709+
List<String> result = getResult(table.newRead(), plan.splits());
710+
assertThat(result.size()).isEqualTo(2);
711+
assertThat(result).containsExactlyInAnyOrder("+I 1|10|101", "+I 1|20|200");
712+
713+
// Delete data
714+
write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 200L));
715+
commit.commit(2, write.prepareCommit(true, 2));
716+
717+
// Verify result after deletion
718+
TableScan.Plan planAfterDelete = table.newScan().plan();
719+
List<String> resultAfterDelete = getResult(table.newRead(), planAfterDelete.splits());
720+
assertThat(resultAfterDelete.size()).isEqualTo(1);
721+
assertThat(resultAfterDelete).containsExactly("+I 1|10|101");
722+
723+
write.close();
724+
commit.close();
725+
}
726+
727+
@Test
728+
public void testEmptyTableScan() throws Exception {
729+
// Test empty table scan
730+
TableScan.Plan plan = table.newScan().plan();
731+
assertThat(plan.splits()).isEmpty();
732+
733+
// Partition list for empty table
734+
List<PartitionEntry> partitionEntries = table.newScan().listPartitionEntries();
735+
assertThat(partitionEntries).isEmpty();
736+
}
737+
738+
@Test
739+
public void testScanWithMultipleFilters() throws Exception {
740+
createAppendOnlyTable();
741+
742+
StreamTableWrite write = table.newWrite(commitUser);
743+
StreamTableCommit commit = table.newCommit(commitUser);
744+
745+
// Write test data
746+
for (int pt = 1; pt <= 3; pt++) {
747+
for (int a = 1; a <= 10; a++) {
748+
write.write(rowData(pt, a * 10, (long) pt * 1000 + a * 100));
749+
commit.commit(pt * 100 + a, write.prepareCommit(true, pt * 100 + a));
750+
}
751+
}
752+
753+
// Combine partition filter and column filter
754+
Map<String, String> partitionSpec = new HashMap<>();
755+
partitionSpec.put("pt", "2");
756+
757+
Predicate filter =
758+
new PredicateBuilder(table.schema().logicalRowType())
759+
.greaterOrEqual(1, 50); // a >= 50
760+
761+
TableScan.Plan plan =
762+
table.newScan().withPartitionFilter(partitionSpec).withFilter(filter).plan();
763+
764+
List<String> result = getResult(table.newRead(), plan.splits());
765+
766+
// Verify result: only data with pt=2 and a >= 50
767+
assertThat(result).allMatch(s -> s.contains("2|"));
768+
for (String r : result) {
769+
String[] parts = r.split("\\|");
770+
int aValue = Integer.parseInt(parts[1].trim());
771+
assertThat(aValue).isGreaterThanOrEqualTo(50);
772+
}
773+
774+
write.close();
775+
commit.close();
776+
}
777+
778+
@Test
779+
public void testLimitWithPartitionFilter() throws Exception {
780+
createAppendOnlyTable();
781+
782+
StreamTableWrite write = table.newWrite(commitUser);
783+
StreamTableCommit commit = table.newCommit(commitUser);
784+
785+
// Write data to different partitions
786+
for (int pt = 1; pt <= 3; pt++) {
787+
for (int i = 0; i < 10; i++) {
788+
write.write(rowData(pt, i, (long) pt * 1000 + i * 100));
789+
commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i));
790+
}
791+
}
792+
793+
// Use partition filter + limit
794+
Map<String, String> partitionSpec = new HashMap<>();
795+
partitionSpec.put("pt", "2");
796+
797+
TableScan.Plan plan =
798+
table.newScan().withPartitionFilter(partitionSpec).withLimit(5).plan();
799+
800+
// Should return at most 5 splits (1 row per split)
801+
assertThat(plan.splits().size()).isLessThanOrEqualTo(5);
802+
803+
// All data should come from partition 2
804+
List<String> result = getResult(table.newRead(), plan.splits());
805+
assertThat(result).allMatch(s -> s.contains("2|"));
806+
807+
write.close();
808+
commit.close();
809+
}
810+
811+
@Test
812+
public void testScanAfterCompaction() throws Exception {
813+
// Test scan after compaction for primary key table
814+
StreamTableWrite write = table.newWrite(commitUser);
815+
StreamTableCommit commit = table.newCommit(commitUser);
816+
817+
// Write data with same primary key multiple times to trigger compaction
818+
for (int i = 0; i < 5; i++) {
819+
write.write(rowData(1, 10, 100L + i));
820+
commit.commit(i, write.prepareCommit(true, i));
821+
}
822+
823+
// Scan result should only have the latest value
824+
TableScan.Plan plan = table.newScan().plan();
825+
List<String> result = getResult(table.newRead(), plan.splits());
826+
assertThat(result.size()).isEqualTo(1);
827+
assertThat(result).containsExactly("+I 1|10|104"); // latest value
828+
829+
write.close();
830+
commit.close();
831+
}
832+
833+
@Test
834+
public void testTopNWithPartitionFilter() throws Exception {
835+
createAppendOnlyTable();
836+
837+
StreamTableWrite write = table.newWrite(commitUser);
838+
StreamTableCommit commit = table.newCommit(commitUser);
839+
840+
// Write data to different partitions
841+
for (int pt = 1; pt <= 2; pt++) {
842+
for (int i = 1; i <= 5; i++) {
843+
write.write(rowData(pt, i * 10, (long) pt * 1000 + i * 100));
844+
commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i));
845+
}
846+
}
847+
848+
// Combine partition filter and TopN
849+
Map<String, String> partitionSpec = new HashMap<>();
850+
partitionSpec.put("pt", "1");
851+
852+
DataField field = table.schema().fields().get(1);
853+
FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
854+
855+
TableScan.Plan plan =
856+
table.newScan()
857+
.withPartitionFilter(partitionSpec)
858+
.withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 2))
859+
.plan();
860+
861+
// Verify result: only pt=1 data, and top 2
862+
List<Split> splits = plan.splits();
863+
assertThat(splits.size()).isLessThanOrEqualTo(2);
864+
865+
write.close();
866+
commit.close();
867+
}
542868
}

0 commit comments

Comments
 (0)