scanRows = new ArrayList<>(64);
+ ByteBuffer recordsBuffer = ByteBuffer.wrap(response.getRecords());
+ DefaultValueRecordBatch valueRecords =
+ DefaultValueRecordBatch.pointToByteBuffer(recordsBuffer);
+
+ for (ValueRecord record : valueRecords.records(readContext)) {
+ InternalRow row = record.getRow();
+ if (targetSchemaId != record.schemaId()) {
+ int[] indexMapping =
+ schemaProjectionCache.computeIfAbsent(
+ record.schemaId(),
+ sourceSchemaId ->
+ SchemaUtil.getIndexMapping(
+ schemaGetter.getSchema(sourceSchemaId),
+ schemaGetter.getSchema(targetSchemaId)));
+ row = ProjectedRow.from(indexMapping).replaceRow(row);
+ }
+ if (projectedFields != null) {
+ row = applyColumnProjection(row);
+ }
+ scanRows.add(row);
+ }
+ return scanRows;
+ }
+
+ private InternalRow applyColumnProjection(InternalRow row) {
+ return ProjectedRow.from(projectedFields).replaceRow(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+
+ if (scannerId != null && hasMoreResults) {
+ // Close scanner on server
+ int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket);
+ TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
+ if (gateway != null) {
+ gateway.scanKv(
+ new ScanKvRequest()
+ .setScannerId(scannerId)
+ .setCallSeqId(callSeqId)
+ .setCloseScanner(true));
+ }
+ }
+
+ if (inFlightRequest != null) {
+ inFlightRequest.cancel(true);
+ }
+ }
+}
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvScanITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvScanITCase.java
new file mode 100644
index 0000000000..83103c78a4
--- /dev/null
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvScanITCase.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for full KV scan via {@code table.newKvScan().execute()}.
+ *
+ * All non-partitioned tables use {@link #NUM_BUCKETS} buckets to exercise multi-bucket fan-out
+ * on the 3-node cluster provided by {@link ClientToServerITCaseBase}. Each test calls {@link
+ * #waitAllReplicasReady} (or {@link #waitPartitionedTableReplicasReady}) after table creation to
+ * ensure leader election completes before scanning — especially important for empty tables or
+ * partitions that have no upsert traffic as a natural synchronization barrier.
+ */
+public class KvScanITCase extends ClientToServerITCaseBase {
+
+ private static final int NUM_BUCKETS = 3;
+
+ /** Schema shared by all non-partitioned PK tests: (id INT, name STRING), PK = id. */
+ private static final Schema PK_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ /** Schema shared by partitioned PK tests: (id INT, p STRING, name STRING), PK = (id, p). */
+ private static final Schema PARTITIONED_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("p", DataTypes.STRING())
+ .column("name", DataTypes.STRING())
+ .primaryKey("id", "p")
+ .build();
+
+ // -------------------------------------------------------------------------
+ // Basic / structural tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testBasicScan() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_basic_scan"));
+
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "a"));
+ writer.upsert(row(2, "b"));
+ writer.upsert(row(3, "c"));
+ writer.flush();
+
+ List result = kvScanAll(table);
+
+ assertThat(result).hasSize(3);
+ result.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ assertThatRow(result.get(0)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(1, "a"));
+ assertThatRow(result.get(1)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(2, "b"));
+ assertThatRow(result.get(2)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(3, "c"));
+ }
+
+ @Test
+ void testEmptyTable() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_empty_table"));
+ // No upsert traffic — waitAllReplicasReady inside createPkTable ensures leaders are ready.
+ assertThat(kvScanAll(table)).isEmpty();
+ }
+
+ @Test
+ void testLargeDataScan() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_large_data_scan"));
+
+ int rowCount = 10_000;
+ UpsertWriter writer = table.newUpsert().createWriter();
+ for (int i = 0; i < rowCount; i++) {
+ writer.upsert(row(i, "val" + i));
+ }
+ writer.flush();
+
+ List result = kvScanAll(table);
+
+ assertThat(result).hasSize(rowCount);
+ result.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ for (int i = 0; i < rowCount; i++) {
+ assertThatRow(result.get(i))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(i, "val" + i));
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Partitioned table
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testPartitionedTableScan() throws Exception {
+ TablePath tablePath = TablePath.of("test_db", "test_partitioned_scan");
+ long tableId = createPartitionedTable(tablePath);
+
+ admin.createPartition(
+ tablePath, new PartitionSpec(Collections.singletonMap("p", "p1")), false)
+ .get();
+ admin.createPartition(
+ tablePath, new PartitionSpec(Collections.singletonMap("p", "p2")), false)
+ .get();
+ waitPartitionedTableReplicasReady(tableId, tablePath);
+
+ Table table = conn.getTable(tablePath);
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "p1", "a1"));
+ writer.upsert(row(2, "p1", "b1"));
+ writer.upsert(row(1, "p2", "a2"));
+ writer.flush();
+
+ List result = kvScanAll(table);
+ assertThat(result).hasSize(3);
+ result.sort(
+ Comparator.comparingInt((InternalRow r) -> r.getInt(0))
+ .thenComparing(r -> r.getString(1).toString()));
+ assertThatRow(result.get(0))
+ .withSchema(PARTITIONED_SCHEMA.getRowType())
+ .isEqualTo(row(1, "p1", "a1"));
+ assertThatRow(result.get(1))
+ .withSchema(PARTITIONED_SCHEMA.getRowType())
+ .isEqualTo(row(1, "p2", "a2"));
+ assertThatRow(result.get(2))
+ .withSchema(PARTITIONED_SCHEMA.getRowType())
+ .isEqualTo(row(2, "p1", "b1"));
+ }
+
+ @Test
+ void testPartitionedTableEmptyPartition() throws Exception {
+ TablePath tablePath = TablePath.of("test_db", "test_partitioned_empty");
+ long tableId = createPartitionedTable(tablePath);
+
+ // p1 will have data; p2 will be empty
+ admin.createPartition(
+ tablePath, new PartitionSpec(Collections.singletonMap("p", "p1")), false)
+ .get();
+ admin.createPartition(
+ tablePath, new PartitionSpec(Collections.singletonMap("p", "p2")), false)
+ .get();
+ // Explicitly wait for p2's bucket replicas — it has no writes to act as a barrier
+ waitPartitionedTableReplicasReady(tableId, tablePath);
+
+ Table table = conn.getTable(tablePath);
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "p1", "a1"));
+ writer.upsert(row(2, "p1", "b1"));
+ writer.flush();
+
+ List result = kvScanAll(table);
+ // Only p1 rows; p2 is empty and must not contribute any rows
+ assertThat(result).hasSize(2);
+ }
+
+ // -------------------------------------------------------------------------
+ // Data correctness
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testDeleteVisibility() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_delete_visibility"));
+
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "a"));
+ writer.upsert(row(2, "b"));
+ writer.upsert(row(3, "c"));
+ writer.flush();
+
+ writer.delete(row(2, "b"));
+ writer.flush();
+
+ List result = kvScanAll(table);
+
+ assertThat(result).hasSize(2);
+ result.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ assertThatRow(result.get(0)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(1, "a"));
+ assertThatRow(result.get(1)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(3, "c"));
+ }
+
+ @Test
+ void testUpsertOverwrite() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_upsert_overwrite"));
+
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "original"));
+ writer.flush();
+
+ writer.upsert(row(1, "updated"));
+ writer.flush();
+
+ List result = kvScanAll(table);
+
+ assertThat(result).hasSize(1);
+ assertThatRow(result.get(0))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(1, "updated"));
+ }
+
+ /**
+ * Verifies point-in-time snapshot isolation: a scan that completes before any mutations only
+ * sees the original state; a scan that starts after mutations sees the updated state.
+ */
+ @Test
+ void testSnapshotIsolation() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_snapshot_isolation"));
+
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "a"));
+ writer.upsert(row(2, "b"));
+ writer.upsert(row(3, "c"));
+ writer.flush();
+
+ // First scan: captures state {1, 2, 3} and fully drains before any mutation.
+ List beforeMutation = kvScanAll(table);
+
+ // Mutate: add row 4, delete row 1.
+ writer.upsert(row(4, "d"));
+ writer.delete(row(1, "a"));
+ writer.flush();
+
+ // Second scan: fresh snapshot after mutations must see {2, 3, 4}.
+ List afterMutation = kvScanAll(table);
+
+ assertThat(beforeMutation).hasSize(3);
+ beforeMutation.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ assertThatRow(beforeMutation.get(0))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(1, "a"));
+ assertThatRow(beforeMutation.get(1))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(2, "b"));
+ assertThatRow(beforeMutation.get(2))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(3, "c"));
+
+ assertThat(afterMutation).hasSize(3);
+ afterMutation.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ assertThatRow(afterMutation.get(0))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(2, "b"));
+ assertThatRow(afterMutation.get(1))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(3, "c"));
+ assertThatRow(afterMutation.get(2))
+ .withSchema(PK_SCHEMA.getRowType())
+ .isEqualTo(row(4, "d"));
+ }
+
+ // -------------------------------------------------------------------------
+ // Iterator lifecycle
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testEarlyClose() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_early_close"));
+
+ UpsertWriter writer = table.newUpsert().createWriter();
+ for (int i = 0; i < 1000; i++) {
+ writer.upsert(row(i, "val" + i));
+ }
+ writer.flush();
+
+ int readCount = 0;
+ try (CloseableIterator iterator = table.newKvScan().execute()) {
+ while (iterator.hasNext() && readCount < 5) {
+ iterator.next();
+ readCount++;
+ }
+ }
+ assertThat(readCount).isEqualTo(5);
+ }
+
+ @Test
+ void testIteratorCloseIdempotent() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_close_idempotent"));
+
+ UpsertWriter writer = table.newUpsert().createWriter();
+ writer.upsert(row(1, "a"));
+ writer.flush();
+
+ CloseableIterator iterator = table.newKvScan().execute();
+ while (iterator.hasNext()) {
+ iterator.next();
+ }
+ // Second close must be a no-op, not throw
+ iterator.close();
+ iterator.close();
+ }
+
+ // -------------------------------------------------------------------------
+ // Error / guard tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testNonPrimaryKeyTableThrows() throws Exception {
+ TablePath tablePath = TablePath.of("test_db", "test_log_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .build();
+ TableDescriptor descriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(NUM_BUCKETS).build();
+
+ createTable(tablePath, descriptor, false);
+ Table table = conn.getTable(tablePath);
+
+ assertThatThrownBy(() -> table.newKvScan())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("not a Primary Key Table");
+ }
+
+ // -------------------------------------------------------------------------
+ // Concurrency
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testConcurrentScans() throws Exception {
+ Table table = createPkTable(TablePath.of("test_db", "test_concurrent_scans"));
+
+ int rowCount = 100;
+ UpsertWriter writer = table.newUpsert().createWriter();
+ for (int i = 0; i < rowCount; i++) {
+ writer.upsert(row(i, "val" + i));
+ }
+ writer.flush();
+
+ int concurrency = 4;
+ ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+ AtomicInteger totalRows = new AtomicInteger(0);
+ List> futures = new ArrayList<>();
+
+ for (int t = 0; t < concurrency; t++) {
+ futures.add(
+ executor.submit(
+ () -> {
+ try {
+ totalRows.addAndGet(kvScanAll(table).size());
+ } catch (Exception e) {
+ throw new FlussRuntimeException(e);
+ }
+ }));
+ }
+
+ for (Future> f : futures) {
+ f.get();
+ }
+ executor.shutdown();
+
+ // Each concurrent scan must see all rows
+ assertThat(totalRows.get()).isEqualTo(rowCount * concurrency);
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ /**
+ * Creates a non-partitioned primary key table using {@link #PK_SCHEMA} with {@link
+ * #NUM_BUCKETS} buckets, waits for all replicas to be ready, and returns an open {@link Table}.
+ */
+ private Table createPkTable(TablePath path) throws Exception {
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(PK_SCHEMA)
+ .distributedBy(NUM_BUCKETS, "id")
+ .build();
+ long tableId = createTable(path, descriptor, true);
+ waitAllReplicasReady(tableId, NUM_BUCKETS);
+ return conn.getTable(path);
+ }
+
+ /**
+ * Creates a partitioned primary key table using {@link #PARTITIONED_SCHEMA} with {@link
+ * #NUM_BUCKETS} buckets and returns the table ID. Partitions must be created separately before
+ * scanning.
+ */
+ private long createPartitionedTable(TablePath path) throws Exception {
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(PARTITIONED_SCHEMA)
+ .partitionedBy("p")
+ .distributedBy(NUM_BUCKETS, "id")
+ .build();
+ return createTable(path, descriptor, true);
+ }
+
+ /**
+ * Waits until all bucket replicas are ready for every partition of a partitioned table. This is
+ * critical for empty partitions that have no write traffic to act as a natural barrier.
+ */
+ private void waitPartitionedTableReplicasReady(long tableId, TablePath tablePath)
+ throws Exception {
+ List partitions = admin.listPartitionInfos(tablePath).get();
+ for (PartitionInfo partition : partitions) {
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
+ new TableBucket(tableId, partition.getPartitionId(), i));
+ }
+ }
+ }
+
+ private List kvScanAll(Table table) throws Exception {
+ List rows = new ArrayList<>();
+ try (CloseableIterator iterator = table.newKvScan().execute()) {
+ while (iterator.hasNext()) {
+ rows.add(iterator.next());
+ }
+ }
+ return rows;
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 7e04c3a1cc..8baf4c351e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -439,6 +439,40 @@ public class ConfigOptions {
+ WRITER_ID_EXPIRATION_TIME.key()
+ " passing. The default value is 10 minutes.");
+ public static final ConfigOption SERVER_SCANNER_TTL =
+ key("server.scanner.ttl")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription(
+ "The time that the tablet server will wait without receiving any scan request from "
+ + "a client before expiring the related status. The default value is 10 minutes.");
+
+ public static final ConfigOption SERVER_SCANNER_EXPIRATION_INTERVAL =
+ key("server.scanner.expiration-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "How often the TTL reaper runs to close idle scanner sessions. "
+ + "The default value is 30 seconds.");
+
+ public static final ConfigOption SERVER_SCANNER_MAX_PER_BUCKET =
+ key("server.scanner.max-per-bucket")
+ .intType()
+ .defaultValue(8)
+ .withDescription(
+ "Maximum number of concurrent scanner sessions per bucket. "
+ + "Exceeding this limit returns TOO_MANY_SCANNERS. "
+ + "The default value is 8.");
+
+ public static final ConfigOption SERVER_SCANNER_MAX_PER_SERVER =
+ key("server.scanner.max-per-server")
+ .intType()
+ .defaultValue(200)
+ .withDescription(
+ "Maximum number of concurrent scanner sessions per tablet server. "
+ + "Exceeding this limit returns TOO_MANY_SCANNERS. "
+ + "The default value is 200.");
+
public static final ConfigOption TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES =
key("tablet-server.controlled-shutdown.max-retries")
.intType()
@@ -1093,6 +1127,14 @@ public class ConfigOptions {
+ "will still be returned to ensure that the fetch can make progress. As such, "
+ "this is not a absolute maximum.");
+ public static final ConfigOption CLIENT_SCANNER_KV_FETCH_MAX_BYTES =
+ key("client.scanner.kv.fetch.max-bytes")
+ .memoryType()
+ .defaultValue(MemorySize.parse("1mb"))
+ .withDescription(
+ "The maximum amount of data the server should return for a kv scan request. "
+ + "The default value is 1mb.");
+
public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET =
key("client.scanner.log.fetch.max-bytes-for-bucket")
.memoryType()
diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index 984a1def4a..b972a80ff8 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -50,6 +50,11 @@ public TableConfig(Configuration config) {
this.config = config;
}
+ /** Gets the table properties configuration. */
+ public Configuration getConfiguration() {
+ return config;
+ }
+
/** Gets the replication factor of the table. */
public int getReplicationFactor() {
return config.get(ConfigOptions.TABLE_REPLICATION_FACTOR);
diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/InvalidScanRequestException.java b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidScanRequestException.java
new file mode 100644
index 0000000000..9c105f1b05
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidScanRequestException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Thrown when a ScanKv request is malformed, for example when both {@code scanner_id} and {@code
+ * bucket_scan_req} are set simultaneously, or when neither is set, or when the {@code call_seq_id}
+ * is out of order.
+ */
+public class InvalidScanRequestException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidScanRequestException(String message) {
+ super(message);
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ScannerExpiredException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerExpiredException.java
new file mode 100644
index 0000000000..efdb785113
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerExpiredException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Thrown when a scanner session has expired due to TTL elapsing on the server. The client must
+ * restart the scan from the beginning.
+ */
+public class ScannerExpiredException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public ScannerExpiredException(String message) {
+ super(message);
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java
new file mode 100644
index 0000000000..e146cb3b09
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception thrown when a scanner is not found. */
+public class ScannerNotFoundException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public ScannerNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/TooManyScannersException.java b/fluss-common/src/main/java/org/apache/fluss/exception/TooManyScannersException.java
new file mode 100644
index 0000000000..4a791b1a98
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/exception/TooManyScannersException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Thrown when the per-bucket or per-server scanner session limit is reached. The client should
+ * back off and retry after a delay.
+ */
+public class TooManyScannersException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public TooManyScannersException(String message) {
+ super(message);
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/UnknownScannerIdException.java b/fluss-common/src/main/java/org/apache/fluss/exception/UnknownScannerIdException.java
new file mode 100644
index 0000000000..a9817466de
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/exception/UnknownScannerIdException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Thrown when a scanner ID is not found on the server and the session was not TTL-expired (e.g.,
+ * server restarted, leadership changed, or the session was explicitly closed). The client must
+ * restart the scan from the beginning.
+ */
+public class UnknownScannerIdException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public UnknownScannerIdException(String message) {
+ super(message);
+ }
+}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java
index 578b74e5e2..932b415650 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java
@@ -42,6 +42,8 @@
import org.apache.fluss.rpc.messages.ProduceLogResponse;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.PutKvResponse;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
@@ -130,6 +132,15 @@ CompletableFuture notifyLeaderAndIsr(
@RPC(api = ApiKeys.LIMIT_SCAN)
CompletableFuture limitScan(LimitScanRequest request);
+ /**
+ * Scan kv data from the specified table bucket.
+ *
+ * @param request the scan kv request
+ * @return the scan kv response
+ */
+ @RPC(api = ApiKeys.SCAN_KV)
+ CompletableFuture scanKv(ScanKvRequest request);
+
/**
* List offsets for the specified table bucket.
*
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index cc033ba8a9..89cb14e96e 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -80,7 +80,8 @@ public enum ApiKeys {
REBALANCE(1049, 0, 0, PUBLIC),
LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC),
CANCEL_REBALANCE(1051, 0, 0, PUBLIC),
- PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE);
+ PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE),
+ SCAN_KV(1053, 0, 0, PUBLIC);
private static final Map ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 5ee652cce2..f43e6cfa7a 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -38,6 +38,7 @@
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidReplicationFactorException;
import org.apache.fluss.exception.InvalidRequiredAcksException;
+import org.apache.fluss.exception.InvalidScanRequestException;
import org.apache.fluss.exception.InvalidServerRackInfoException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.InvalidTargetColumnException;
@@ -64,6 +65,8 @@
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.RecordTooLargeException;
import org.apache.fluss.exception.RetriableAuthenticationException;
+import org.apache.fluss.exception.ScannerExpiredException;
+import org.apache.fluss.exception.ScannerNotFoundException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.SecurityDisabledException;
import org.apache.fluss.exception.SecurityTokenException;
@@ -77,6 +80,8 @@
import org.apache.fluss.exception.TimeoutException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
+import org.apache.fluss.exception.TooManyScannersException;
+import org.apache.fluss.exception.UnknownScannerIdException;
import org.apache.fluss.exception.UnknownServerException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.exception.UnknownWriterIdException;
@@ -240,7 +245,22 @@ public enum Errors {
SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new),
REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new),
NO_REBALANCE_IN_PROGRESS_EXCEPTION(
- 62, "No rebalance task in progress.", NoRebalanceInProgressException::new);
+ 62, "No rebalance task in progress.", NoRebalanceInProgressException::new),
+ /** @deprecated Superseded by {@link #UNKNOWN_SCANNER_ID} (65) and {@link #SCANNER_EXPIRED} (64). */
+ @Deprecated
+ SCANNER_NOT_FOUND_EXCEPTION(63, "The scanner is not found.", ScannerNotFoundException::new),
+ SCANNER_EXPIRED(
+ 64,
+ "The scanner session has expired due to inactivity.",
+ ScannerExpiredException::new),
+ UNKNOWN_SCANNER_ID(
+ 65, "The scanner id is not recognized by the server.", UnknownScannerIdException::new),
+ INVALID_SCAN_REQUEST(
+ 66, "The scan request is invalid.", InvalidScanRequestException::new),
+ TOO_MANY_SCANNERS(
+ 67,
+ "The per-bucket or per-server scanner session limit has been reached.",
+ TooManyScannersException::new);
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto
index db9d614354..12bb81386e 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -255,6 +255,71 @@ message PrefixLookupResponse {
}
+// scan kv request and response
+message ScanKvRequest {
+ // If continuing an existing scan, then you must set scanner_id.
+ // Otherwise, you must set 'new_scan_request'.
+ optional bytes scanner_id = 1;
+ optional PbScanReqForBucket bucket_scan_req = 2;
+
+ // The sequence ID of this call. The sequence ID should start at 0
+ // with the request for a new scanner, and after each successful request,
+ // the client should increment it by 1. When retrying a request, the client
+ // should _not_ increment this value. If the server detects that the client
+ // missed a chunk of rows from the middle of a scan, it will respond with an
+ // error.
+ optional uint32 call_seq_id = 3;
+
+ // The maximum number of bytes to send in the response.
+ optional uint32 batch_size_bytes = 4;
+
+ // If set, the server will close the scanner after responding to
+ // this request, regardless of whether all rows have been delivered.
+ optional bool close_scanner = 5;
+}
+
+message PbScanReqForBucket {
+ // The tablet to scan.
+ required int64 table_id = 1;
+ optional int64 partition_id = 2;
+ required int32 bucket_id = 3;
+
+ // The maximum number of rows to scan with the new scanner.
+ //
+ // The scanner will automatically stop yielding results and close itself
+ // after reaching this number of result rows.
+ optional uint64 limit = 4;
+}
+
+message ScanKvResponse {
+ // The error, if an error occurred with this request.
+ optional int32 error_code = 1;
+ optional string error_message = 2;
+
+ // When a scanner is created, returns the scanner ID which may be used
+ // to pull new rows from the scanner.
+ optional bytes scanner_id = 3;
+
+ // Set to true to indicate that there may be further results to be fetched
+ // from this scanner. If the scanner has no more results, then the scanner
+ // ID will become invalid and cannot continue to be used.
+ //
+ // Note that if a scan returns no results, then the initial response from
+ // the first RPC may return false in this flag, in which case there will
+ // be no scanner ID assigned.
+ optional bool has_more_results = 4;
+
+ // The block of returned rows.
+ //
+ // NOTE: the schema-related fields will not be present in this row block.
+ // The schema will match the schema requested by the client when it created
+ // the scanner.
+ optional bytes records = 5;
+
+ // Returns the corresponding log offset at the time the scanner is created
+ optional int64 log_offset = 6;
+}
+
// limit scan request and response
message LimitScanRequest {
required int64 table_id = 2;
diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
index 7db3654383..1bfada8c7f 100644
--- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
+++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
@@ -71,6 +71,8 @@
import org.apache.fluss.rpc.messages.ProduceLogResponse;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.PutKvResponse;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.rpc.messages.TableExistsRequest;
@@ -135,6 +137,11 @@ public CompletableFuture limitScan(LimitScanRequest request)
return null;
}
+ @Override
+ public CompletableFuture scanKv(ScanKvRequest request) {
+ return null;
+ }
+
@Override
public CompletableFuture listOffsets(ListOffsetsRequest request) {
return null;
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
index f3998f4435..d7ab654151 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
@@ -32,6 +32,7 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
import org.rocksdb.Statistics;
import org.rocksdb.WriteOptions;
@@ -151,6 +152,18 @@ public List limitScan(Integer limit) {
return pkList;
}
+ public Snapshot getSnapshot() {
+ return db.getSnapshot();
+ }
+
+ public void releaseSnapshot(Snapshot snapshot) {
+ db.releaseSnapshot(snapshot);
+ }
+
+ public RocksIterator newIterator(ReadOptions readOptions) {
+ return db.newIterator(defaultColumnFamilyHandle, readOptions);
+ }
+
public void put(byte[] key, byte[] value) throws IOException {
try {
db.put(writeOptions, key, value);
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java
new file mode 100644
index 0000000000..914a8faeef
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.scan;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
+import org.apache.fluss.server.utils.ResourceGuard;
+import org.apache.fluss.utils.clock.Clock;
+
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The context for a scanner session. Each instance holds a RocksDB snapshot and iterator for
+ * point-in-time scan isolation. The {@link ResourceGuard.Lease} prevents RocksDB from being closed
+ * while this session is active.
+ *
+ * This class is not thread-safe; all fields must only be accessed by a single thread at a time.
+ */
+@NotThreadSafe
+public class ScannerContext implements AutoCloseable {
+ private final byte[] scannerId;
+ /** Pre-computed map key to avoid per-RPC {@code ByteBuffer.wrap(scannerId)} allocation. */
+ private final ByteBuffer scannerIdKey;
+ private final TableBucket tableBucket;
+ private final RocksDBKv rocksDBKv;
+ private final RocksIterator iterator;
+ private final ReadOptions readOptions;
+ private final Snapshot snapshot;
+ private final ResourceGuard.Lease resourceLease;
+
+ private int callSeqId;
+ private long lastAccessTime;
+
+ public ScannerContext(
+ byte[] scannerId,
+ TableBucket tableBucket,
+ RocksDBKv rocksDBKv,
+ RocksIterator iterator,
+ ReadOptions readOptions,
+ Snapshot snapshot,
+ ResourceGuard.Lease resourceLease,
+ Clock clock) {
+ this.scannerId = scannerId;
+ this.scannerIdKey = ByteBuffer.wrap(scannerId);
+ this.tableBucket = tableBucket;
+ this.rocksDBKv = rocksDBKv;
+ this.iterator = iterator;
+ this.readOptions = readOptions;
+ this.snapshot = snapshot;
+ this.resourceLease = resourceLease;
+ this.callSeqId = 0;
+ this.lastAccessTime = clock.milliseconds();
+ }
+
+ public byte[] getScannerId() {
+ return scannerId;
+ }
+
+ public ByteBuffer getScannerIdKey() {
+ return scannerIdKey.duplicate();
+ }
+
+ public TableBucket getTableBucket() {
+ return tableBucket;
+ }
+
+ public RocksIterator getIterator() {
+ return iterator;
+ }
+
+ public int getCallSeqId() {
+ return callSeqId;
+ }
+
+ public void setCallSeqId(int callSeqId) {
+ this.callSeqId = callSeqId;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public void updateLastAccessTime(long lastAccessTime) {
+ this.lastAccessTime = lastAccessTime;
+ }
+
+ @Override
+ public void close() {
+ try {
+ iterator.close();
+ } finally {
+ try {
+ readOptions.close();
+ } finally {
+ try {
+ rocksDBKv.releaseSnapshot(snapshot);
+ } finally {
+ resourceLease.close();
+ }
+ }
+ }
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java
new file mode 100644
index 0000000000..36fb5f93cd
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.scan;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.TooManyScannersException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.KvTablet;
+import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
+import org.apache.fluss.server.utils.ResourceGuard;
+import org.apache.fluss.utils.AutoCloseableAsync;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FutureUtils;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Manages scanner sessions for KV snapshot queries. Each session holds a RocksDB snapshot and
+ * iterator. Sessions that are idle longer than the TTL are reaped automatically by a background
+ * task.
+ */
+public class ScannerManager implements AutoCloseableAsync {
+ private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class);
+
+ private final Map scanners = MapUtils.newConcurrentHashMap();
+ /**
+ * Tracks recently-expired scanner IDs and their expiry timestamp. Allows distinguishing between
+ * a scanner that expired (SCANNER_EXPIRED) and one that was never created (UNKNOWN_SCANNER_ID).
+ * Entries are pruned by the cleanup task after {@link #recentlyExpiredRetentionMs} to bound
+ * memory usage.
+ */
+ private final Map recentlyExpiredIds = MapUtils.newConcurrentHashMap();
+ private final Scheduler scheduler;
+ private final Clock clock;
+ private final long scannerTtlMs;
+ /** How long recently-expired IDs are retained for diagnostic error reporting (2 × TTL). */
+ private final long recentlyExpiredRetentionMs;
+ private final int maxPerBucket;
+ private final int maxPerServer;
+ private final AtomicInteger totalScanners = new AtomicInteger(0);
+ private ScheduledFuture> cleanupTask;
+
+ public ScannerManager(Configuration conf, Scheduler scheduler) {
+ this(conf, scheduler, SystemClock.getInstance());
+ }
+
+ public ScannerManager(Configuration conf, Scheduler scheduler, Clock clock) {
+ this.scheduler = scheduler;
+ this.clock = clock;
+ this.scannerTtlMs = conf.get(ConfigOptions.SERVER_SCANNER_TTL).toMillis();
+ this.recentlyExpiredRetentionMs = 2 * scannerTtlMs;
+ long expirationIntervalMs =
+ conf.get(ConfigOptions.SERVER_SCANNER_EXPIRATION_INTERVAL).toMillis();
+ this.maxPerBucket = conf.get(ConfigOptions.SERVER_SCANNER_MAX_PER_BUCKET);
+ this.maxPerServer = conf.get(ConfigOptions.SERVER_SCANNER_MAX_PER_SERVER);
+ this.cleanupTask =
+ scheduler.schedule(
+ "scanner-expiration",
+ this::cleanupExpiredScanners,
+ expirationIntervalMs,
+ expirationIntervalMs);
+ }
+
+ /**
+ * Creates a new scanner session for the given KV tablet and returns the {@link ScannerContext},
+ * or {@code null} if the bucket is empty (no rows to scan).
+ *
+ * When {@code null} is returned, no scanner session is registered and no limit slot is
+ * consumed. The caller should return an empty response immediately.
+ *
+ *
Note on limit enforcement: The per-bucket and server-level limit checks are not
+ * atomic with the subsequent {@code scanners.put()}. This is an intentional trade-off: the
+ * limits are soft guards against runaway scanner creation, not hard resource quotas. In the
+ * rare race where two threads both pass the limit check before either registers, the server
+ * may briefly exceed the configured maximum by a small amount.
+ *
+ * @throws TooManyScannersException if per-bucket or per-server limits are exceeded
+ * @throws IOException if the RocksDB resource guard cannot be acquired
+ */
+ @Nullable
+ public ScannerContext createScanner(KvTablet kvTablet, TableBucket tableBucket)
+ throws IOException {
+ // Check server-level limit first (cheap atomic check)
+ if (totalScanners.get() >= maxPerServer) {
+ throw new TooManyScannersException(
+ "Server scanner limit ("
+ + maxPerServer
+ + ") reached. Try again later.");
+ }
+
+ // Check per-bucket limit
+ int bucketCount = countScannersForBucket(tableBucket);
+ if (bucketCount >= maxPerBucket) {
+ throw new TooManyScannersException(
+ "Per-bucket scanner limit ("
+ + maxPerBucket
+ + ") reached for bucket "
+ + tableBucket
+ + ". Try again later.");
+ }
+
+ RocksDBKv rocksDBKv = kvTablet.getRocksDBKv();
+ // Acquire a lease to prevent RocksDB from being closed while iterating.
+ // All subsequent resource allocations are guarded by a try/catch so the lease
+ // is always released on failure.
+ ResourceGuard.Lease lease = rocksDBKv.getResourceGuard().acquireResource();
+ try {
+ Snapshot snapshot = rocksDBKv.getSnapshot();
+ ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
+ RocksIterator iterator = rocksDBKv.newIterator(readOptions);
+ iterator.seekToFirst();
+
+ if (!iterator.isValid()) {
+ // Empty bucket: release all resources without registering a scanner session.
+ iterator.close();
+ readOptions.close();
+ rocksDBKv.releaseSnapshot(snapshot);
+ lease.close();
+ return null;
+ }
+
+ byte[] scannerId = generateScannerId();
+ ScannerContext context =
+ new ScannerContext(
+ scannerId,
+ tableBucket,
+ rocksDBKv,
+ iterator,
+ readOptions,
+ snapshot,
+ lease,
+ clock);
+ scanners.put(context.getScannerIdKey(), context);
+ totalScanners.incrementAndGet();
+ return context;
+ } catch (Exception e) {
+ lease.close();
+ throw e;
+ }
+ }
+
+ /**
+ * Looks up a scanner by id and refreshes its last-access timestamp. Returns {@code null} if
+ * the scanner is not registered (was never created or has already been removed).
+ */
+ @Nullable
+ public ScannerContext getScanner(byte[] scannerId) {
+ ScannerContext context = scanners.get(ByteBuffer.wrap(scannerId));
+ if (context != null) {
+ context.updateLastAccessTime(clock.milliseconds());
+ }
+ return context;
+ }
+
+ /**
+ * Returns {@code true} if the scanner with the given id was recently expired by the TTL reaper.
+ * This allows callers to distinguish a SCANNER_EXPIRED error from an UNKNOWN_SCANNER_ID error
+ * when {@link #getScanner(byte[])} returns {@code null}.
+ */
+ public boolean isRecentlyExpired(byte[] scannerId) {
+ return recentlyExpiredIds.containsKey(ByteBuffer.wrap(scannerId));
+ }
+
+ /**
+ * Removes and closes the scanner identified by {@code scannerId}. No-op if the id is unknown.
+ */
+ public void removeScanner(byte[] scannerId) {
+ ScannerContext context = scanners.remove(ByteBuffer.wrap(scannerId));
+ if (context != null) {
+ totalScanners.decrementAndGet();
+ closeScannerContext(context);
+ }
+ }
+
+ /**
+ * Removes and closes a known scanner context directly, avoiding a map lookup. Prefer this over
+ * {@link #removeScanner(byte[])} when the caller already holds the {@link ScannerContext}.
+ */
+ public void removeScanner(ScannerContext context) {
+ if (scanners.remove(context.getScannerIdKey(), context)) {
+ totalScanners.decrementAndGet();
+ closeScannerContext(context);
+ }
+ }
+
+ /**
+ * Closes and removes all active scanner sessions for the given bucket. Called when a bucket
+ * loses leadership to prevent stale RocksDB snapshot/iterator leaks.
+ */
+ public void closeScannersForBucket(TableBucket tableBucket) {
+ List toRemove = new ArrayList<>();
+ for (Map.Entry entry : scanners.entrySet()) {
+ if (tableBucket.equals(entry.getValue().getTableBucket())) {
+ toRemove.add(entry.getKey());
+ }
+ }
+ for (ByteBuffer key : toRemove) {
+ ScannerContext context = scanners.remove(key);
+ if (context != null) {
+ totalScanners.decrementAndGet();
+ LOG.info(
+ "Closing scanner {} for bucket {} due to leadership change.",
+ scannerIdToString(key.array()),
+ tableBucket);
+ closeScannerContext(context);
+ }
+ }
+ }
+
+ /**
+ * Counts active scanners for a given bucket. O(n) over total scanners — acceptable because
+ * {@code maxPerServer} is small (default 200) and this path is only on scanner creation.
+ */
+ private int countScannersForBucket(TableBucket tableBucket) {
+ int count = 0;
+ for (ScannerContext ctx : scanners.values()) {
+ if (tableBucket.equals(ctx.getTableBucket())) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private void cleanupExpiredScanners() {
+ long now = clock.milliseconds();
+
+ // Prune stale entries from the recently-expired set to bound memory usage
+ recentlyExpiredIds.entrySet().removeIf(e -> now - e.getValue() > recentlyExpiredRetentionMs);
+
+ for (Map.Entry entry : scanners.entrySet()) {
+ ScannerContext context = entry.getValue();
+ if (now - context.getLastAccessTime() > scannerTtlMs) {
+ // Atomic conditional remove to avoid double-close race with removeScanner()
+ if (scanners.remove(entry.getKey(), context)) {
+ totalScanners.decrementAndGet();
+ // Record the expiry so subsequent lookups can return SCANNER_EXPIRED
+ recentlyExpiredIds.put(entry.getKey(), now);
+ LOG.info(
+ "Scanner {} expired after {}ms idle, closing it.",
+ scannerIdToString(entry.getKey().array()),
+ scannerTtlMs);
+ closeScannerContext(context);
+ }
+ }
+ }
+ }
+
+ private void closeScannerContext(ScannerContext context) {
+ try {
+ context.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close scanner context.", e);
+ }
+ }
+
+ private byte[] generateScannerId() {
+ return UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8);
+ }
+
+ private String scannerIdToString(byte[] scannerId) {
+ return new String(scannerId, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ try {
+ close();
+ return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (cleanupTask != null) {
+ cleanupTask.cancel(false);
+ }
+ // Use the same conditional-remove pattern as the cleanup task so that each ScannerContext
+ // is closed by exactly one caller even if the cleanup task is concurrently running.
+ for (Map.Entry entry : scanners.entrySet()) {
+ if (scanners.remove(entry.getKey(), entry.getValue())) {
+ totalScanners.decrementAndGet();
+ closeScannerContext(entry.getValue());
+ }
+ }
+ recentlyExpiredIds.clear();
+ totalScanners.set(0);
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 8eed63c844..bf6b3210cc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -41,6 +41,7 @@
import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader;
import org.apache.fluss.server.coordinator.MetadataManager;
import org.apache.fluss.server.kv.KvManager;
+import org.apache.fluss.server.kv.scan.ScannerManager;
import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.remote.RemoteLogManager;
@@ -125,6 +126,9 @@ public class TabletServer extends ServerBase {
@GuardedBy("lock")
private TabletService tabletService;
+ @GuardedBy("lock")
+ private ScannerManager scannerManager;
+
@GuardedBy("lock")
private MetricRegistry metricRegistry;
@@ -230,6 +234,8 @@ protected void startServices() throws Exception {
this.kvManager = KvManager.create(conf, zkClient, logManager, tabletServerMetricGroup);
kvManager.startup();
+ this.scannerManager = new ScannerManager(conf, scheduler, clock);
+
// Register kvManager to dynamicConfigManager for dynamic reconfiguration
dynamicConfigManager.register(kvManager);
// Start dynamicConfigManager after all reconfigurable components are registered
@@ -286,6 +292,7 @@ protected void startServices() throws Exception {
metadataManager,
authorizer,
dynamicConfigManager,
+ scannerManager,
ioExecutor);
RequestsMetrics requestsMetrics =
@@ -428,6 +435,12 @@ CompletableFuture stopServices() {
scheduler.shutdown();
}
+ // Close scanner sessions before shutting down kvManager so that
+ // RocksDB snapshots are released while RocksDB is still open.
+ if (scannerManager != null) {
+ scannerManager.close();
+ }
+
if (kvManager != null) {
kvManager.shutdown();
}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 992b963334..507abd34ba 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -19,10 +19,13 @@
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.exception.AuthorizationException;
+import org.apache.fluss.exception.NonPrimaryKeyTableException;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.DefaultValueRecordBatch;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
@@ -50,12 +53,15 @@
import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse;
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
+import org.apache.fluss.rpc.messages.PbScanReqForBucket;
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
import org.apache.fluss.rpc.messages.PrefixLookupResponse;
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import org.apache.fluss.rpc.messages.ProduceLogResponse;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.PutKvResponse;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
@@ -70,17 +76,25 @@
import org.apache.fluss.server.coordinator.MetadataManager;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
+import org.apache.fluss.server.entity.StopReplicaData;
import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.kv.scan.ScannerContext;
+import org.apache.fluss.server.kv.scan.ScannerManager;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.log.ListOffsetsParam;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.metadata.TabletServerMetadataProvider;
+import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.replica.ReplicaManager;
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.rocksdb.RocksIterator;
+
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -124,9 +138,11 @@
public final class TabletService extends RpcServiceBase implements TabletServerGateway {
private final String serviceName;
+ private final int serverId;
private final ReplicaManager replicaManager;
private final TabletServerMetadataCache metadataCache;
private final TabletServerMetadataProvider metadataFunctionProvider;
+ private final ScannerManager scannerManager;
public TabletService(
int serverId,
@@ -137,6 +153,7 @@ public TabletService(
MetadataManager metadataManager,
@Nullable Authorizer authorizer,
DynamicConfigManager dynamicConfigManager,
+ ScannerManager scannerManager,
ExecutorService ioExecutor) {
super(
remoteFileSystem,
@@ -147,8 +164,10 @@ public TabletService(
dynamicConfigManager,
ioExecutor);
this.serviceName = "server-" + serverId;
+ this.serverId = serverId;
this.replicaManager = replicaManager;
this.metadataCache = metadataCache;
+ this.scannerManager = scannerManager;
this.metadataFunctionProvider =
new TabletServerMetadataProvider(zkClient, metadataManager, metadataCache);
}
@@ -285,12 +304,161 @@ public CompletableFuture limitScan(LimitScanRequest request)
return response;
}
+ @Override
+ public CompletableFuture scanKv(ScanKvRequest request) {
+ CompletableFuture response = new CompletableFuture<>();
+ try {
+ boolean hasScannerId = request.hasScannerId();
+ boolean hasBucketScanReq = request.hasBucketScanReq();
+
+ if (hasScannerId && hasBucketScanReq) {
+ throw Errors.INVALID_SCAN_REQUEST.exception(
+ "ScanKv request must provide either a scanner_id or a bucket_scan_req, not both.");
+ }
+
+ if (hasScannerId) {
+ byte[] scannerId = request.getScannerId();
+ ScannerContext context = scannerManager.getScanner(scannerId);
+ if (context == null) {
+ String idStr = new String(scannerId, StandardCharsets.UTF_8);
+ if (scannerManager.isRecentlyExpired(scannerId)) {
+ throw Errors.SCANNER_EXPIRED.exception(
+ "Scanner session expired: " + idStr);
+ }
+ throw Errors.UNKNOWN_SCANNER_ID.exception(
+ "Unknown scanner id: " + idStr);
+ }
+
+ if (request.hasCloseScanner() && request.isCloseScanner()) {
+ scannerManager.removeScanner(context);
+ ScanKvResponse scanResponse = new ScanKvResponse();
+ scanResponse.setScannerId(scannerId).setHasMoreResults(false);
+ response.complete(scanResponse);
+ return response;
+ }
+
+ // Validate call sequence to detect reordered or duplicate requests.
+ // Update callSeqId only after continueScan() succeeds so that clients can safely
+ // retry with the same callSeqId if the server returns an error response.
+ int expectedSeq = context.getCallSeqId() + 1;
+ if (request.getCallSeqId() != expectedSeq) {
+ throw Errors.INVALID_SCAN_REQUEST.exception(
+ "Out of order scan request. Expected call_seq_id: "
+ + expectedSeq
+ + ", but got: "
+ + request.getCallSeqId());
+ }
+ ScanKvResponse scanResponse = continueScan(context, request.getBatchSizeBytes());
+ context.setCallSeqId(request.getCallSeqId());
+ response.complete(scanResponse);
+ } else {
+ if (!hasBucketScanReq) {
+ throw Errors.INVALID_SCAN_REQUEST.exception(
+ "ScanKv request must provide either a scanner_id or a bucket_scan_req.");
+ }
+ PbScanReqForBucket bucketScanReq = request.getBucketScanReq();
+ authorizeTable(READ, bucketScanReq.getTableId());
+
+ TableBucket tb =
+ new TableBucket(
+ bucketScanReq.getTableId(),
+ bucketScanReq.hasPartitionId()
+ ? bucketScanReq.getPartitionId()
+ : null,
+ bucketScanReq.getBucketId());
+ Replica replica = replicaManager.getReplicaOrException(tb);
+ if (!replica.isLeader()) {
+ throw new NotLeaderOrFollowerException("Leader not local for bucket " + tb);
+ }
+ if (!replica.isKvTable()) {
+ throw new NonPrimaryKeyTableException(
+ "Table " + bucketScanReq.getTableId() + " is not a primary key table.");
+ }
+
+ // Capture log offset before opening the snapshot so clients get a lower bound
+ // on the WAL position consistent with the data they are about to read.
+ long logHighWatermark = replica.getLogHighWatermark();
+ ScannerContext context = scannerManager.createScanner(replica.getKvTablet(), tb);
+
+ if (context == null) {
+ // Empty bucket: no rows, no scanner registered
+ ScanKvResponse scanResponse = new ScanKvResponse();
+ scanResponse.setHasMoreResults(false).setLogOffset(logHighWatermark);
+ response.complete(scanResponse);
+ return response;
+ }
+
+ ScanKvResponse scanResponse = continueScan(context, request.getBatchSizeBytes());
+ // Return the log offset for clients to determine where to resume reading the log
+ // after the snapshot scan completes.
+ scanResponse.setLogOffset(logHighWatermark);
+ response.complete(scanResponse);
+ }
+ } catch (Exception e) {
+ response.complete(makeScanKvErrorResponse(e));
+ }
+ return response;
+ }
+
+ private ScanKvResponse continueScan(ScannerContext context, int batchSizeBytes)
+ throws IOException {
+ final byte[] scannerId = context.getScannerId();
+ RocksIterator iterator = context.getIterator();
+ DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder();
+ int currentBytes = 0;
+ int rowsInBatch = 0;
+
+ while (iterator.isValid()) {
+ byte[] value = iterator.value();
+ // Always include at least one record; stop if the batch would exceed the size limit
+ if (rowsInBatch > 0 && currentBytes + value.length > batchSizeBytes) {
+ break;
+ }
+
+ builder.append(value);
+ currentBytes += value.length;
+ rowsInBatch++;
+ iterator.next();
+ }
+
+ boolean hasMore = iterator.isValid();
+
+ ScanKvResponse response = new ScanKvResponse();
+ response.setScannerId(scannerId).setHasMoreResults(hasMore);
+ if (rowsInBatch > 0) {
+ DefaultValueRecordBatch batch = builder.build();
+ byte[] records = new byte[batch.sizeInBytes()];
+ batch.getSegment().get(0, records);
+ response.setRecords(records);
+ }
+
+ if (!hasMore) {
+ scannerManager.removeScanner(context);
+ }
+
+ return response;
+ }
+
+ private ScanKvResponse makeScanKvErrorResponse(Throwable e) {
+ ScanKvResponse response = new ScanKvResponse();
+ ApiError error = ApiError.fromThrowable(e);
+ response.setErrorCode(error.error().code()).setErrorMessage(error.message());
+ return response;
+ }
+
@Override
public CompletableFuture notifyLeaderAndIsr(
NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) {
CompletableFuture response = new CompletableFuture<>();
List notifyLeaderAndIsrRequestData =
getNotifyLeaderAndIsrRequestData(notifyLeaderAndIsrRequest);
+ // Close scanner sessions only for buckets where this server is becoming a follower.
+ // Buckets where this server remains or becomes the leader keep their snapshots valid.
+ for (NotifyLeaderAndIsrData data : notifyLeaderAndIsrRequestData) {
+ if (data.getLeader() != serverId) {
+ scannerManager.closeScannersForBucket(data.getTableBucket());
+ }
+ }
replicaManager.becomeLeaderOrFollower(
notifyLeaderAndIsrRequest.getCoordinatorEpoch(),
notifyLeaderAndIsrRequestData,
@@ -326,9 +494,14 @@ public CompletableFuture updateMetadata(UpdateMetadataRe
public CompletableFuture stopReplica(
StopReplicaRequest stopReplicaRequest) {
CompletableFuture response = new CompletableFuture<>();
+ List stopReplicaData = getStopReplicaData(stopReplicaRequest);
+ // Close any active scanner sessions for all buckets being stopped to avoid leaks.
+ for (StopReplicaData data : stopReplicaData) {
+ scannerManager.closeScannersForBucket(data.getTableBucket());
+ }
replicaManager.stopReplicas(
stopReplicaRequest.getCoordinatorEpoch(),
- getStopReplicaData(stopReplicaRequest),
+ stopReplicaData,
result -> response.complete(makeStopReplicaResponse(result)));
return response;
}
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 500d197fcf..dc7e15dccc 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -81,6 +81,8 @@
import org.apache.fluss.rpc.messages.ProduceLogResponse;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.PutKvResponse;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.rpc.messages.TableExistsRequest;
@@ -204,6 +206,11 @@ public CompletableFuture limitScan(LimitScanRequest request)
return null;
}
+ @Override
+ public CompletableFuture scanKv(ScanKvRequest request) {
+ return null;
+ }
+
@Override
public CompletableFuture listOffsets(ListOffsetsRequest request) {
return null;