diff --git a/docs/generated/flink_connector_configuration.html b/docs/generated/flink_connector_configuration.html
index 686349e7bdfc..792f5a699766 100644
--- a/docs/generated/flink_connector_configuration.html
+++ b/docs/generated/flink_connector_configuration.html
@@ -152,6 +152,12 @@
Boolean |
Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job. |
+
+ scan.bucket |
+ (none) |
+ Integer |
+ Specify a single bucket to scan. This option filters manifest entries and only plans splits for the given bucket. It is only supported for fixed-bucket primary key tables (bucket > 0). It cannot be used with postpone bucket tables. |
+
scan.dedicated-split-generation |
false |
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 553a8487491e..04ebdd8a1275 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -23,6 +23,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
@@ -37,6 +39,7 @@
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
/** Implementation for {@link ReadBuilder}. */
@@ -161,10 +164,42 @@ public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {
@Override
public ReadBuilder withBucket(int bucket) {
+ validateSpecifiedBucket(table, bucket);
this.specifiedBucket = bucket;
return this;
}
+ /**
+ * Validates bucket id before manifest pruning ({@link InnerTableScan#withBucket(int)}). Callers
+ * such as Flink {@code scan.bucket} should route through {@link #withBucket(int)}.
+ */
+ static void validateSpecifiedBucket(InnerTable table, int bucket) {
+ checkArgument(bucket >= 0, "Bucket id must be non-negative, but is %s.", bucket);
+ if (!(table instanceof FileStoreTable)) {
+ throw new IllegalArgumentException(
+ "Bucket scan is only supported for FileStoreTable, but got "
+ + table.getClass().getName());
+ }
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ checkArgument(
+ fileStoreTable.bucketMode() == BucketMode.HASH_FIXED,
+ "Bucket scan is only supported for fixed-bucket tables, but got bucket mode %s.",
+ fileStoreTable.bucketMode());
+ checkArgument(
+ !fileStoreTable.schema().primaryKeys().isEmpty(),
+ "Bucket scan is only supported for primary key tables.");
+ int numBuckets = CoreOptions.fromMap(fileStoreTable.options()).bucket();
+ checkArgument(
+ numBuckets > 0,
+ "Bucket scan is only supported for tables with bucket > 0, but got bucket %s.",
+ numBuckets);
+ checkArgument(
+ bucket < numBuckets,
+ "Bucket id %s must be less than table bucket number %s.",
+ bucket,
+ numBuckets);
+ }
+
@Override
public ReadBuilder withBucketFilter(Filter bucketFilter) {
this.bucketFilter = bucketFilter;
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/ReadBuilderImplTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/ReadBuilderImplTest.java
new file mode 100644
index 000000000000..dc66da602c72
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/ReadBuilderImplTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ReadBuilderImpl}. */
+public class ReadBuilderImplTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testValidateSpecifiedBucketOnFixedBucketPrimaryKeyTable() throws Exception {
+ FileStoreTable table = createTable("4", true);
+ assertThatCode(() -> table.newReadBuilder().withBucket(0)).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testValidateSpecifiedBucketRejectsDynamicBucketTable() throws Exception {
+ FileStoreTable table = createTable("-1", true);
+ assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("fixed-bucket tables")
+ .hasMessageContaining("HASH_DYNAMIC");
+ }
+
+ @Test
+ public void testValidateSpecifiedBucketRejectsPostponeBucketTable() throws Exception {
+ FileStoreTable table = createTable("-2", true);
+ assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("fixed-bucket tables")
+ .hasMessageContaining("POSTPONE_MODE");
+ }
+
+ @Test
+ public void testValidateSpecifiedBucketRejectsBucketUnawareTable() throws Exception {
+ FileStoreTable table = createBucketUnawareAppendOnlyTable();
+ assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("fixed-bucket tables")
+ .hasMessageContaining("BUCKET_UNAWARE");
+ }
+
+ @Test
+ public void testValidateSpecifiedBucketRejectsTableWithoutPrimaryKey() throws Exception {
+ FileStoreTable table = createAppendOnlyTable("4");
+ assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("primary key tables");
+ }
+
+ @Test
+ public void testValidateSpecifiedBucketRejectsOutOfRangeBucketId() throws Exception {
+ FileStoreTable table = createTable("4", true);
+ assertThatThrownBy(() -> table.newReadBuilder().withBucket(4))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Bucket id 4 must be less than table bucket number 4");
+ }
+
+ private FileStoreTable createTable(String bucket, boolean withPrimaryKey) throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), bucket);
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder().column("id", DataTypes.INT()).column("v", DataTypes.INT());
+ if (withPrimaryKey) {
+ schemaBuilder.primaryKey("id");
+ }
+ Schema schema = schemaBuilder.options(options).build();
+ return createTable(schema);
+ }
+
+ private FileStoreTable createBucketUnawareAppendOnlyTable() throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .options(options)
+ .build();
+ return createTable(schema);
+ }
+
+ private FileStoreTable createAppendOnlyTable(String bucket) throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), bucket);
+ options.put(CoreOptions.BUCKET_KEY.key(), "id");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .options(options)
+ .build();
+ return createTable(schema);
+ }
+
+ private FileStoreTable createTable(Schema schema) throws Exception {
+ Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
+ catalog.createDatabase("default", true);
+ Identifier identifier = Identifier.create("default", "test_bucket");
+ catalog.createTable(identifier, schema, false);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 61c741fee288..c35e8ef04af0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -274,6 +274,16 @@ public class FlinkConnectorOptions {
+ "normal source, the max partition(s) will be determined before job running "
+ "without refreshing even for streaming jobs.");
+ public static final ConfigOption SCAN_BUCKET =
+ ConfigOptions.key("scan.bucket")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify a single bucket to scan. This option filters manifest entries "
+ + "and only plans splits for the given bucket. It is only supported "
+ + "for fixed-bucket primary key tables (bucket > 0). It cannot be used "
+ + "with postpone bucket tables.");
+
public static final ConfigOption LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
.durationType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 3e96dec1ea50..149f699a6381 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -26,6 +26,7 @@
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorSource;
+import org.apache.paimon.flink.utils.ScanBucketUtils;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -200,6 +201,7 @@ private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType
if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
+ ScanBucketUtils.applyScanBucket(table, readBuilder, conf);
return readBuilder.dropStats();
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 66cb49798aa0..e1f509582e26 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -26,6 +26,7 @@
import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
import org.apache.paimon.flink.lookup.PartitionLoader;
import org.apache.paimon.flink.lookup.StaticPartitionLoader;
+import org.apache.paimon.flink.utils.ScanBucketUtils;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -35,6 +36,7 @@
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -245,13 +247,14 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
protected void scanSplitsForInference() {
if (splitStatistics == null) {
if (table instanceof DataTable) {
- List partitionEntries =
+ ReadBuilder readBuilder =
table.newReadBuilder()
.dropStats()
.withFilter(predicate)
- .withPartitionFilter(partitionPredicate)
- .newScan()
- .listPartitionEntries();
+ .withPartitionFilter(partitionPredicate);
+ ScanBucketUtils.applyScanBucket(table, readBuilder, options);
+ List partitionEntries =
+ readBuilder.newScan().listPartitionEntries();
long totalSize = 0;
long rowCount = 0;
for (PartitionEntry entry : partitionEntries) {
@@ -262,15 +265,14 @@ protected void scanSplitsForInference() {
splitStatistics =
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
} else {
- List splits =
+ ReadBuilder readBuilder =
table.newReadBuilder()
.dropStats()
.withFilter(predicate)
.withPartitionFilter(partitionPredicate)
- .withProjection(new int[0])
- .newScan()
- .plan()
- .splits();
+ .withProjection(new int[0]);
+ ScanBucketUtils.applyScanBucket(table, readBuilder, options);
+ List splits = readBuilder.newScan().plan().splits();
splitStatistics =
new SplitStatistics(
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java
new file mode 100644
index 000000000000..19f251233e6a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+
+/** Utilities for {@link FlinkConnectorOptions#SCAN_BUCKET}. */
+public class ScanBucketUtils {
+
+ private ScanBucketUtils() {}
+
+ /** Apply {@link FlinkConnectorOptions#SCAN_BUCKET} to the given {@link ReadBuilder}. */
+ public static ReadBuilder applyScanBucket(
+ Table table, ReadBuilder readBuilder, Options options) {
+ Integer scanBucket = options.get(FlinkConnectorOptions.SCAN_BUCKET);
+ if (scanBucket == null) {
+ return readBuilder;
+ }
+ return readBuilder.withBucket(scanBucket);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java
new file mode 100644
index 000000000000..0bfd9adf1cb6
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for {@link org.apache.paimon.flink.FlinkConnectorOptions#SCAN_BUCKET}. */
+public class ScanBucketITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List ddl() {
+ return singletonList(
+ "CREATE TABLE IF NOT EXISTS T (id INT, v INT, PRIMARY KEY (id) NOT ENFORCED) "
+ + "WITH ('bucket' = '4')");
+ }
+
+ @Nullable
+ @Override
+ protected Boolean sqlSyncMode() {
+ return true;
+ }
+
+ @Test
+ public void testScanBucketFilter() throws Exception {
+ FileStoreTable table = paimonTable("T");
+ writeRows(table, 1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 6, 60, 7, 70, 8, 80);
+
+ int targetBucket = 0;
+ for (int bucket = 0; bucket < 4; bucket++) {
+ List files = table.store().newScan().withBucket(bucket).plan().files();
+ if (!files.isEmpty()) {
+ targetBucket = bucket;
+ break;
+ }
+ }
+
+ List expected = readRowsFromBucket(table, targetBucket);
+
+ List actual =
+ batchSql(
+ String.format(
+ "SELECT * FROM T /*+ OPTIONS('scan.bucket' = '%s') */",
+ targetBucket));
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @Test
+ public void testScanBucketRejectsDynamicBucketTable() {
+ sql(
+ "CREATE TABLE dynamic_t (id INT, v INT, PRIMARY KEY (id) NOT ENFORCED) "
+ + "WITH ('bucket' = '-1')");
+
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "SELECT * FROM dynamic_t /*+ OPTIONS('scan.bucket' = '0') */"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Bucket scan is only supported for fixed-bucket tables"));
+ }
+
+ @Test
+ public void testScanBucketRejectsBucketUnawareTable() {
+ sql("CREATE TABLE append_t (id INT, v INT) WITH ('bucket' = '-1')");
+
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "SELECT * FROM append_t /*+ OPTIONS('scan.bucket' = '0') */"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Bucket scan is only supported for fixed-bucket tables"));
+ }
+
+ @Test
+ public void testScanBucketRejectsPostponeBucketTable() {
+ sql(
+ "CREATE TABLE postpone_t (id INT, v INT, PRIMARY KEY (id) NOT ENFORCED) "
+ + "WITH ('bucket' = '-2')");
+
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "SELECT * FROM postpone_t /*+ OPTIONS('scan.bucket' = '0') */"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Bucket scan is only supported for fixed-bucket tables"));
+ }
+
+ private void writeRows(FileStoreTable table, int... idAndValues) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ for (int i = 0; i < idAndValues.length; i += 2) {
+ write.write(GenericRow.of(idAndValues[i], idAndValues[i + 1]));
+ }
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(write.prepareCommit());
+ write.close();
+ commit.close();
+ }
+
+ private List readRowsFromBucket(FileStoreTable table, int bucket) throws Exception {
+ List files = table.store().newScan().withBucket(bucket).plan().files();
+ List rows = new ArrayList<>();
+ for (ManifestEntry file : files) {
+ DataSplit split =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+ .withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("not used")
+ .build();
+ RecordReader reader = table.newReadBuilder().newRead().createReader(split);
+ RecordReaderIterator iterator = new RecordReaderIterator<>(reader);
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ rows.add(Row.of(row.getInt(0), row.getInt(1)));
+ }
+ iterator.close();
+ }
+ return rows;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java
new file mode 100644
index 000000000000..9c122c9d90de
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ScanBucketUtils}. */
+public class ScanBucketUtilsTest extends AbstractTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testInvalidBucket() throws Exception {
+ FileStoreTable table = createPrimaryKeyTable("4");
+ Options options = new Options();
+ options.set(FlinkConnectorOptions.SCAN_BUCKET, 5);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ assertThatThrownBy(() -> ScanBucketUtils.applyScanBucket(table, readBuilder, options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Bucket id 5 must be less than table bucket number 4");
+ }
+
+ @Test
+ public void testRejectDynamicBucketTable() throws Exception {
+ FileStoreTable table = createPrimaryKeyTable("-1");
+ Options options = new Options();
+ options.set(FlinkConnectorOptions.SCAN_BUCKET, 0);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ assertThatThrownBy(() -> ScanBucketUtils.applyScanBucket(table, readBuilder, options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("fixed-bucket tables");
+ }
+
+ @Test
+ public void testRejectBucketUnawareTable() throws Exception {
+ FileStoreTable table = createBucketUnawareAppendOnlyTable();
+ Options options = new Options();
+ options.set(FlinkConnectorOptions.SCAN_BUCKET, 0);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ assertThatThrownBy(() -> ScanBucketUtils.applyScanBucket(table, readBuilder, options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("fixed-bucket tables");
+ }
+
+ @Test
+ public void testRejectTableWithoutPrimaryKey() throws Exception {
+ FileStoreTable table = createAppendOnlyTable("4");
+ Options options = new Options();
+ options.set(FlinkConnectorOptions.SCAN_BUCKET, 0);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ assertThatThrownBy(() -> ScanBucketUtils.applyScanBucket(table, readBuilder, options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("primary key tables");
+ }
+
+ private FileStoreTable createPrimaryKeyTable(String numBuckets) throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), numBuckets);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .primaryKey("id")
+ .options(options)
+ .build();
+ return createTable(schema);
+ }
+
+ private FileStoreTable createBucketUnawareAppendOnlyTable() throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .options(options)
+ .build();
+ return createTable(schema);
+ }
+
+ private FileStoreTable createAppendOnlyTable(String numBuckets) throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), numBuckets);
+ options.put(CoreOptions.BUCKET_KEY.key(), "id");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .options(options)
+ .build();
+ return createTable(schema);
+ }
+
+ private FileStoreTable createTable(Schema schema) throws Exception {
+ Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
+ catalog.createDatabase("default", true);
+ Identifier identifier = Identifier.create("default", "test_bucket");
+ catalog.createTable(identifier, schema, false);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+}