diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java index 8532f2a856..20d5fcd84b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java @@ -23,7 +23,9 @@ import org.apache.fluss.client.lookup.TableLookup; import org.apache.fluss.client.metadata.ClientSchemaGetter; import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.KvScan; import org.apache.fluss.client.table.scanner.TableScan; +import org.apache.fluss.client.table.scanner.TableKvScan; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.TableAppend; import org.apache.fluss.client.table.writer.TableUpsert; @@ -67,6 +69,15 @@ public Scan newScan() { return new TableScan(conn, tableInfo, schemaGetter); } + @Override + public KvScan newKvScan() { + checkState( + hasPrimaryKey, + "Table %s is not a Primary Key Table and doesn't support KvScan.", + tablePath); + return new TableKvScan(conn, tableInfo, schemaGetter); + } + @Override public Lookup newLookup() { return new TableLookup( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java b/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java index 813b62034a..11fa4fd36d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.lookup.Lookup; import org.apache.fluss.client.lookup.Lookuper; +import org.apache.fluss.client.table.scanner.KvScan; import org.apache.fluss.client.table.scanner.Scan; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.AppendWriter; @@ -55,6 +56,12 @@ public interface Table extends AutoCloseable { */ Scan newScan(); + /** + * Creates a new {@link KvScan} for this table to read all live KV data from the primary key + * table's RocksDB store (requires to be a Primary Key Table). + */ + KvScan newKvScan(); + /** * Creates a new {@link Lookup} for this table to configure and create a {@link Lookuper} to * lookup data for this table by primary key or a prefix of primary key. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/KvScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/KvScan.java new file mode 100644 index 0000000000..b018467ff0 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/KvScan.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +/** + * Used to configure and execute a full scan over the KV (RocksDB) state of a primary key table. + * + *

The scan reads all live rows from every bucket, handling partitioning and bucket enumeration + * automatically. Each call to {@link #execute()} opens a point-in-time RocksDB snapshot on each + * tablet server and streams the rows back to the client. + * + *

Example usage: + * + *

{@code
+ * try (CloseableIterator rows = table.newKvScan().execute()) {
+ *     while (rows.hasNext()) {
+ *         InternalRow row = rows.next();
+ *         // process row
+ *     }
+ * }
+ * }
+ * + * @since 0.9 + */ +@PublicEvolving +public interface KvScan { + + /** + * Executes the KV scan to read all current data in the table. Partitions and buckets are + * enumerated automatically; the caller only needs to iterate the returned rows. + * + * @return a closeable iterator of the rows in the table; must be closed after use + * @throws org.apache.fluss.exception.FlussRuntimeException if partition enumeration fails or + * an error occurs while reading rows from the server + */ + CloseableIterator execute(); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java new file mode 100644 index 0000000000..26b6c2920a --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +/** + * Used to configure and execute a snapshot query to read all kv data of a primary key table. + * + * @since 0.9 + */ +@PublicEvolving +public interface SnapshotQuery { + /** + * Executes the snapshot query to read all current data in the given table bucket. + * + * @param tableBucket the table bucket to read + * @return a closeable iterator of the rows in the table bucket + */ + CloseableIterator execute(TableBucket tableBucket); + + /** + * Executes the snapshot query to read all current data in the table. Everything around + * partitions and buckets will be taken care of from the client. + * + * @return a closeable iterator of the rows in the table + */ + CloseableIterator execute(); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableKvScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableKvScan.java new file mode 100644 index 0000000000..9f6e214b12 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableKvScan.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.client.FlussConnection; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.client.table.scanner.batch.KvBatchScanner; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** Implementation of {@link KvScan}. */ +public class TableKvScan implements KvScan { + + /** Timeout used when polling each batch from the server-side scanner. */ + private static final Duration BATCH_POLL_TIMEOUT = Duration.ofSeconds(30); + + private final FlussConnection conn; + private final TableInfo tableInfo; + private final SchemaGetter schemaGetter; + + public TableKvScan(FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) { + this.conn = conn; + this.tableInfo = tableInfo; + this.schemaGetter = schemaGetter; + } + + @Override + public CloseableIterator execute() { + List buckets = new ArrayList<>(); + try { + if (tableInfo.isPartitioned()) { + List partitions = + conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get(); + for (PartitionInfo partition : partitions) { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add( + new TableBucket( + tableInfo.getTableId(), partition.getPartitionId(), i)); + } + } + } else { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add(new TableBucket(tableInfo.getTableId(), i)); + } + } + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to list partitions for table " + tableInfo.getTablePath(), e); + } + + Scan scan = new TableScan(conn, tableInfo, schemaGetter); + return new MultiBucketIterator(buckets, scan); + } + + private static class MultiBucketIterator implements CloseableIterator { + private final List buckets; + private final Scan scan; + private int nextBucketIndex; + @Nullable private BatchScanner prefetchedScanner; + private CloseableIterator currentScannerIterator; + private boolean isClosed = false; + + private MultiBucketIterator(List buckets, Scan scan) { + this.buckets = buckets; + this.scan = scan; + this.nextBucketIndex = 0; + // Eagerly open and prefetch the first bucket scanner so its first RPC is in-flight + // while the caller is setting up iteration. + if (!buckets.isEmpty()) { + prefetchedScanner = openAndPrefetch(nextBucketIndex); + nextBucketIndex++; + } + } + + private BatchScanner openAndPrefetch(int index) { + BatchScanner scanner = scan.createBatchScanner(buckets.get(index)); + if (scanner instanceof KvBatchScanner) { + ((KvBatchScanner) scanner).prefetch(); + } + return scanner; + } + + @Override + public boolean hasNext() { + if (isClosed) { + return false; + } + while (currentScannerIterator == null || !currentScannerIterator.hasNext()) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + currentScannerIterator = null; + } + // Take the prefetched scanner (first RPC already in-flight), or open a new one. + BatchScanner nextScanner; + if (prefetchedScanner != null) { + nextScanner = prefetchedScanner; + prefetchedScanner = null; + } else if (nextBucketIndex < buckets.size()) { + nextScanner = scan.createBatchScanner(buckets.get(nextBucketIndex++)); + } else { + return false; + } + // Eagerly open and prefetch the scanner for the bucket after this one. + if (nextBucketIndex < buckets.size()) { + prefetchedScanner = openAndPrefetch(nextBucketIndex); + nextBucketIndex++; + } + currentScannerIterator = new BatchScannerIterator(nextScanner); + } + return true; + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentScannerIterator.next(); + } + + @Override + public void close() { + if (!isClosed) { + isClosed = true; + if (prefetchedScanner != null) { + try { + prefetchedScanner.close(); + } catch (IOException e) { + throw new FlussRuntimeException("Error closing prefetched scanner", e); + } + prefetchedScanner = null; + } + if (currentScannerIterator != null) { + currentScannerIterator.close(); + } + } + } + } + + private static class BatchScannerIterator implements CloseableIterator { + private final BatchScanner scanner; + private Iterator currentBatch; + private boolean isClosed = false; + + private BatchScannerIterator(BatchScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean hasNext() { + ensureBatch(); + return currentBatch != null && currentBatch.hasNext(); + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentBatch.next(); + } + + private void ensureBatch() { + try { + while ((currentBatch == null || !currentBatch.hasNext()) && !isClosed) { + CloseableIterator it = scanner.pollBatch(BATCH_POLL_TIMEOUT); + if (it == null) { + isClosed = true; + break; + } + if (it.hasNext()) { + currentBatch = it; + } else { + it.close(); + } + } + } catch (IOException e) { + throw new FlussRuntimeException("Error polling batch from scanner", e); + } + } + + @Override + public void close() { + if (!isClosed) { + isClosed = true; + try { + scanner.close(); + } catch (IOException e) { + throw new FlussRuntimeException("Error closing scanner", e); + } + } + } + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 0d1d28a0e2..bcd5a2ec5b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.client.table.scanner.batch.KvBatchScanner; import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner; import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; @@ -123,10 +124,19 @@ public TypedLogScanner createTypedLogScanner(Class pojoClass) { @Override public BatchScanner createBatchScanner(TableBucket tableBucket) { + if (tableInfo.hasPrimaryKey()) { + return new KvBatchScanner( + tableInfo, + tableBucket, + schemaGetter, + conn.getMetadataUpdater(), + projectedColumns, + limit == null ? null : (long) limit); + } if (limit == null) { throw new UnsupportedOperationException( String.format( - "Currently, BatchScanner is only available when limit is set. Table: %s, bucket: %s", + "Currently, for log table, BatchScanner is only available when limit is set. Table: %s, bucket: %s", tableInfo.getTablePath(), tableBucket)); } return new LimitBatchScanner( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java new file mode 100644 index 0000000000..eb4c7ac022 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.client.FlussConnection; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** Implementation of {@link SnapshotQuery}. */ +public class TableSnapshotQuery implements SnapshotQuery { + + private final FlussConnection conn; + private final TableInfo tableInfo; + private final SchemaGetter schemaGetter; + + /** The projected fields to do projection. No projection if is null. */ + @Nullable private final int[] projectedColumns; + + public TableSnapshotQuery( + FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) { + this(conn, tableInfo, schemaGetter, null); + } + + private TableSnapshotQuery( + FlussConnection conn, + TableInfo tableInfo, + SchemaGetter schemaGetter, + @Nullable int[] projectedColumns) { + this.conn = conn; + this.tableInfo = tableInfo; + this.schemaGetter = schemaGetter; + this.projectedColumns = projectedColumns; + } + + @Override + public CloseableIterator execute(TableBucket tableBucket) { + Scan scan = new TableScan(conn, tableInfo, schemaGetter); + if (projectedColumns != null) { + scan = scan.project(projectedColumns); + } + BatchScanner batchScanner = scan.createBatchScanner(tableBucket); + return new BatchScannerIterator(batchScanner); + } + + @Override + public CloseableIterator execute() { + List buckets = new ArrayList<>(); + try { + if (tableInfo.isPartitioned()) { + List partitions = + conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get(); + for (PartitionInfo partition : partitions) { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add( + new TableBucket( + tableInfo.getTableId(), partition.getPartitionId(), i)); + } + } + } else { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add(new TableBucket(tableInfo.getTableId(), i)); + } + } + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to list partitions for table " + tableInfo.getTablePath(), e); + } + + return new MultiBucketBatchScannerIterator(buckets); + } + + private class MultiBucketBatchScannerIterator implements CloseableIterator { + private final Iterator bucketIterator; + private CloseableIterator currentScannerIterator; + private boolean isClosed = false; + + private MultiBucketBatchScannerIterator(List buckets) { + this.bucketIterator = buckets.iterator(); + } + + @Override + public boolean hasNext() { + if (isClosed) { + return false; + } + while (currentScannerIterator == null || !currentScannerIterator.hasNext()) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + currentScannerIterator = null; + } + if (bucketIterator.hasNext()) { + currentScannerIterator = execute(bucketIterator.next()); + } else { + return false; + } + } + return true; + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentScannerIterator.next(); + } + + @Override + public void close() { + if (!isClosed) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + } + isClosed = true; + } + } + } + + private static class BatchScannerIterator implements CloseableIterator { + private final BatchScanner scanner; + private Iterator currentBatch; + private boolean isClosed = false; + + private BatchScannerIterator(BatchScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean hasNext() { + ensureBatch(); + return currentBatch != null && currentBatch.hasNext(); + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentBatch.next(); + } + + private void ensureBatch() { + try { + while ((currentBatch == null || !currentBatch.hasNext()) && !isClosed) { + CloseableIterator it = + scanner.pollBatch(Duration.ofMinutes(1)); // Use a large timeout + if (it == null) { + isClosed = true; + break; + } + if (it.hasNext()) { + currentBatch = it; + } else { + it.close(); + } + } + } catch (IOException e) { + throw new RuntimeException("Error polling batch from scanner", e); + } + } + + @Override + public void close() { + if (!isClosed) { + try { + scanner.close(); + } catch (IOException e) { + throw new RuntimeException("Error closing scanner", e); + } + isClosed = true; + } + } + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java new file mode 100644 index 0000000000..427d1a22d3 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.LeaderNotAvailableException; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordReadContext; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.SchemaUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** A {@link BatchScanner} implementation that scans records from a primary key table. */ +public class KvBatchScanner implements BatchScanner { + private static final Logger LOG = LoggerFactory.getLogger(KvBatchScanner.class); + + private final TableInfo tableInfo; + private final TableBucket tableBucket; + private final SchemaGetter schemaGetter; + private final MetadataUpdater metadataUpdater; + @Nullable private final int[] projectedFields; + @Nullable private final Long limit; + private final int targetSchemaId; + private final KvFormat kvFormat; + private final int batchSizeBytes; + + private final Map schemaProjectionCache = new HashMap<>(); + private final ValueRecordReadContext readContext; + + private byte[] scannerId; + private int callSeqId = 0; + private boolean hasMoreResults = true; + private boolean isClosed = false; + + private CompletableFuture inFlightRequest; + + public KvBatchScanner( + TableInfo tableInfo, + TableBucket tableBucket, + SchemaGetter schemaGetter, + MetadataUpdater metadataUpdater, + @Nullable int[] projectedFields, + @Nullable Long limit) { + this.tableInfo = tableInfo; + this.tableBucket = tableBucket; + this.schemaGetter = schemaGetter; + this.metadataUpdater = metadataUpdater; + this.projectedFields = projectedFields; + this.limit = limit; + this.targetSchemaId = tableInfo.getSchemaId(); + this.kvFormat = tableInfo.getTableConfig().getKvFormat(); + this.batchSizeBytes = + (int) + tableInfo + .getTableConfig() + .getConfiguration() + .get(ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES) + .getBytes(); + + this.readContext = ValueRecordReadContext.createReadContext(schemaGetter, kvFormat); + } + + @Nullable + @Override + public CloseableIterator pollBatch(Duration timeout) throws IOException { + if (isClosed || (!hasMoreResults && inFlightRequest == null)) { + return null; + } + + try { + if (inFlightRequest == null) { + sendRequest(); + } + + ScanKvResponse response = + inFlightRequest.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + inFlightRequest = null; + + if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) { + throw Errors.forCode(response.getErrorCode()).exception(response.getErrorMessage()); + } + + this.scannerId = response.getScannerId(); + this.hasMoreResults = response.isHasMoreResults(); + this.callSeqId++; + + List rows = parseScanKvResponse(response); + + // pipeline: send next request if there are more results + if (hasMoreResults) { + sendRequest(); + } + + return CloseableIterator.wrap(rows.iterator()); + } catch (java.util.concurrent.TimeoutException e) { + return CloseableIterator.emptyIterator(); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Fires the first RPC eagerly so the network round-trip overlaps with draining the previous + * bucket. No-op if a request is already in-flight or the scanner is closed/exhausted. + */ + public void prefetch() { + if (!isClosed && hasMoreResults && inFlightRequest == null) { + sendRequest(); + } + } + + private void sendRequest() { + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway == null) { + throw new LeaderNotAvailableException( + "Server " + leader + " is not found in metadata cache."); + } + + ScanKvRequest request = new ScanKvRequest(); + request.setBatchSizeBytes(batchSizeBytes); + if (scannerId == null) { + PbScanReqForBucket bucketScanReq = request.setBucketScanReq(); + bucketScanReq.setTableId(tableBucket.getTableId()).setBucketId(tableBucket.getBucket()); + if (tableBucket.getPartitionId() != null) { + bucketScanReq.setPartitionId(tableBucket.getPartitionId()); + } + if (limit != null) { + bucketScanReq.setLimit(limit); + } + request.setCallSeqId(0); + } else { + request.setScannerId(scannerId).setCallSeqId(callSeqId); + } + + inFlightRequest = gateway.scanKv(request); + } + + private List parseScanKvResponse(ScanKvResponse response) { + if (!response.hasRecords()) { + return Collections.emptyList(); + } + + List scanRows = new ArrayList<>(64); + ByteBuffer recordsBuffer = ByteBuffer.wrap(response.getRecords()); + DefaultValueRecordBatch valueRecords = + DefaultValueRecordBatch.pointToByteBuffer(recordsBuffer); + + for (ValueRecord record : valueRecords.records(readContext)) { + InternalRow row = record.getRow(); + if (targetSchemaId != record.schemaId()) { + int[] indexMapping = + schemaProjectionCache.computeIfAbsent( + record.schemaId(), + sourceSchemaId -> + SchemaUtil.getIndexMapping( + schemaGetter.getSchema(sourceSchemaId), + schemaGetter.getSchema(targetSchemaId))); + row = ProjectedRow.from(indexMapping).replaceRow(row); + } + if (projectedFields != null) { + row = applyColumnProjection(row); + } + scanRows.add(row); + } + return scanRows; + } + + private InternalRow applyColumnProjection(InternalRow row) { + return ProjectedRow.from(projectedFields).replaceRow(row); + } + + @Override + public void close() throws IOException { + if (isClosed) { + return; + } + isClosed = true; + + if (scannerId != null && hasMoreResults) { + // Close scanner on server + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway != null) { + gateway.scanKv( + new ScanKvRequest() + .setScannerId(scannerId) + .setCallSeqId(callSeqId) + .setCloseScanner(true)); + } + } + + if (inFlightRequest != null) { + inFlightRequest.cancel(true); + } + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvScanITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvScanITCase.java new file mode 100644 index 0000000000..83103c78a4 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvScanITCase.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.admin.ClientToServerITCaseBase; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Integration tests for full KV scan via {@code table.newKvScan().execute()}. + * + *

All non-partitioned tables use {@link #NUM_BUCKETS} buckets to exercise multi-bucket fan-out + * on the 3-node cluster provided by {@link ClientToServerITCaseBase}. Each test calls {@link + * #waitAllReplicasReady} (or {@link #waitPartitionedTableReplicasReady}) after table creation to + * ensure leader election completes before scanning — especially important for empty tables or + * partitions that have no upsert traffic as a natural synchronization barrier. + */ +public class KvScanITCase extends ClientToServerITCaseBase { + + private static final int NUM_BUCKETS = 3; + + /** Schema shared by all non-partitioned PK tests: (id INT, name STRING), PK = id. */ + private static final Schema PK_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + /** Schema shared by partitioned PK tests: (id INT, p STRING, name STRING), PK = (id, p). */ + private static final Schema PARTITIONED_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("p", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .primaryKey("id", "p") + .build(); + + // ------------------------------------------------------------------------- + // Basic / structural tests + // ------------------------------------------------------------------------- + + @Test + void testBasicScan() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_basic_scan")); + + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + + List result = kvScanAll(table); + + assertThat(result).hasSize(3); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(result.get(0)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(1, "a")); + assertThatRow(result.get(1)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(2, "b")); + assertThatRow(result.get(2)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(3, "c")); + } + + @Test + void testEmptyTable() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_empty_table")); + // No upsert traffic — waitAllReplicasReady inside createPkTable ensures leaders are ready. + assertThat(kvScanAll(table)).isEmpty(); + } + + @Test + void testLargeDataScan() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_large_data_scan")); + + int rowCount = 10_000; + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < rowCount; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + List result = kvScanAll(table); + + assertThat(result).hasSize(rowCount); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + for (int i = 0; i < rowCount; i++) { + assertThatRow(result.get(i)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(i, "val" + i)); + } + } + + // ------------------------------------------------------------------------- + // Partitioned table + // ------------------------------------------------------------------------- + + @Test + void testPartitionedTableScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_partitioned_scan"); + long tableId = createPartitionedTable(tablePath); + + admin.createPartition( + tablePath, new PartitionSpec(Collections.singletonMap("p", "p1")), false) + .get(); + admin.createPartition( + tablePath, new PartitionSpec(Collections.singletonMap("p", "p2")), false) + .get(); + waitPartitionedTableReplicasReady(tableId, tablePath); + + Table table = conn.getTable(tablePath); + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "p1", "a1")); + writer.upsert(row(2, "p1", "b1")); + writer.upsert(row(1, "p2", "a2")); + writer.flush(); + + List result = kvScanAll(table); + assertThat(result).hasSize(3); + result.sort( + Comparator.comparingInt((InternalRow r) -> r.getInt(0)) + .thenComparing(r -> r.getString(1).toString())); + assertThatRow(result.get(0)) + .withSchema(PARTITIONED_SCHEMA.getRowType()) + .isEqualTo(row(1, "p1", "a1")); + assertThatRow(result.get(1)) + .withSchema(PARTITIONED_SCHEMA.getRowType()) + .isEqualTo(row(1, "p2", "a2")); + assertThatRow(result.get(2)) + .withSchema(PARTITIONED_SCHEMA.getRowType()) + .isEqualTo(row(2, "p1", "b1")); + } + + @Test + void testPartitionedTableEmptyPartition() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_partitioned_empty"); + long tableId = createPartitionedTable(tablePath); + + // p1 will have data; p2 will be empty + admin.createPartition( + tablePath, new PartitionSpec(Collections.singletonMap("p", "p1")), false) + .get(); + admin.createPartition( + tablePath, new PartitionSpec(Collections.singletonMap("p", "p2")), false) + .get(); + // Explicitly wait for p2's bucket replicas — it has no writes to act as a barrier + waitPartitionedTableReplicasReady(tableId, tablePath); + + Table table = conn.getTable(tablePath); + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "p1", "a1")); + writer.upsert(row(2, "p1", "b1")); + writer.flush(); + + List result = kvScanAll(table); + // Only p1 rows; p2 is empty and must not contribute any rows + assertThat(result).hasSize(2); + } + + // ------------------------------------------------------------------------- + // Data correctness + // ------------------------------------------------------------------------- + + @Test + void testDeleteVisibility() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_delete_visibility")); + + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + + writer.delete(row(2, "b")); + writer.flush(); + + List result = kvScanAll(table); + + assertThat(result).hasSize(2); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(result.get(0)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(1, "a")); + assertThatRow(result.get(1)).withSchema(PK_SCHEMA.getRowType()).isEqualTo(row(3, "c")); + } + + @Test + void testUpsertOverwrite() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_upsert_overwrite")); + + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "original")); + writer.flush(); + + writer.upsert(row(1, "updated")); + writer.flush(); + + List result = kvScanAll(table); + + assertThat(result).hasSize(1); + assertThatRow(result.get(0)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(1, "updated")); + } + + /** + * Verifies point-in-time snapshot isolation: a scan that completes before any mutations only + * sees the original state; a scan that starts after mutations sees the updated state. + */ + @Test + void testSnapshotIsolation() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_snapshot_isolation")); + + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + + // First scan: captures state {1, 2, 3} and fully drains before any mutation. + List beforeMutation = kvScanAll(table); + + // Mutate: add row 4, delete row 1. + writer.upsert(row(4, "d")); + writer.delete(row(1, "a")); + writer.flush(); + + // Second scan: fresh snapshot after mutations must see {2, 3, 4}. + List afterMutation = kvScanAll(table); + + assertThat(beforeMutation).hasSize(3); + beforeMutation.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(beforeMutation.get(0)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(1, "a")); + assertThatRow(beforeMutation.get(1)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(2, "b")); + assertThatRow(beforeMutation.get(2)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(3, "c")); + + assertThat(afterMutation).hasSize(3); + afterMutation.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(afterMutation.get(0)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(2, "b")); + assertThatRow(afterMutation.get(1)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(3, "c")); + assertThatRow(afterMutation.get(2)) + .withSchema(PK_SCHEMA.getRowType()) + .isEqualTo(row(4, "d")); + } + + // ------------------------------------------------------------------------- + // Iterator lifecycle + // ------------------------------------------------------------------------- + + @Test + void testEarlyClose() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_early_close")); + + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < 1000; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + int readCount = 0; + try (CloseableIterator iterator = table.newKvScan().execute()) { + while (iterator.hasNext() && readCount < 5) { + iterator.next(); + readCount++; + } + } + assertThat(readCount).isEqualTo(5); + } + + @Test + void testIteratorCloseIdempotent() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_close_idempotent")); + + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.flush(); + + CloseableIterator iterator = table.newKvScan().execute(); + while (iterator.hasNext()) { + iterator.next(); + } + // Second close must be a no-op, not throw + iterator.close(); + iterator.close(); + } + + // ------------------------------------------------------------------------- + // Error / guard tests + // ------------------------------------------------------------------------- + + @Test + void testNonPrimaryKeyTableThrows() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_log_table"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(NUM_BUCKETS).build(); + + createTable(tablePath, descriptor, false); + Table table = conn.getTable(tablePath); + + assertThatThrownBy(() -> table.newKvScan()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("not a Primary Key Table"); + } + + // ------------------------------------------------------------------------- + // Concurrency + // ------------------------------------------------------------------------- + + @Test + void testConcurrentScans() throws Exception { + Table table = createPkTable(TablePath.of("test_db", "test_concurrent_scans")); + + int rowCount = 100; + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < rowCount; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + int concurrency = 4; + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + AtomicInteger totalRows = new AtomicInteger(0); + List> futures = new ArrayList<>(); + + for (int t = 0; t < concurrency; t++) { + futures.add( + executor.submit( + () -> { + try { + totalRows.addAndGet(kvScanAll(table).size()); + } catch (Exception e) { + throw new FlussRuntimeException(e); + } + })); + } + + for (Future f : futures) { + f.get(); + } + executor.shutdown(); + + // Each concurrent scan must see all rows + assertThat(totalRows.get()).isEqualTo(rowCount * concurrency); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Creates a non-partitioned primary key table using {@link #PK_SCHEMA} with {@link + * #NUM_BUCKETS} buckets, waits for all replicas to be ready, and returns an open {@link Table}. + */ + private Table createPkTable(TablePath path) throws Exception { + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(PK_SCHEMA) + .distributedBy(NUM_BUCKETS, "id") + .build(); + long tableId = createTable(path, descriptor, true); + waitAllReplicasReady(tableId, NUM_BUCKETS); + return conn.getTable(path); + } + + /** + * Creates a partitioned primary key table using {@link #PARTITIONED_SCHEMA} with {@link + * #NUM_BUCKETS} buckets and returns the table ID. Partitions must be created separately before + * scanning. + */ + private long createPartitionedTable(TablePath path) throws Exception { + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(PARTITIONED_SCHEMA) + .partitionedBy("p") + .distributedBy(NUM_BUCKETS, "id") + .build(); + return createTable(path, descriptor, true); + } + + /** + * Waits until all bucket replicas are ready for every partition of a partitioned table. This is + * critical for empty partitions that have no write traffic to act as a natural barrier. + */ + private void waitPartitionedTableReplicasReady(long tableId, TablePath tablePath) + throws Exception { + List partitions = admin.listPartitionInfos(tablePath).get(); + for (PartitionInfo partition : partitions) { + for (int i = 0; i < NUM_BUCKETS; i++) { + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady( + new TableBucket(tableId, partition.getPartitionId(), i)); + } + } + } + + private List kvScanAll(Table table) throws Exception { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = table.newKvScan().execute()) { + while (iterator.hasNext()) { + rows.add(iterator.next()); + } + } + return rows; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7e04c3a1cc..8baf4c351e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -439,6 +439,40 @@ public class ConfigOptions { + WRITER_ID_EXPIRATION_TIME.key() + " passing. The default value is 10 minutes."); + public static final ConfigOption SERVER_SCANNER_TTL = + key("server.scanner.ttl") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The time that the tablet server will wait without receiving any scan request from " + + "a client before expiring the related status. The default value is 10 minutes."); + + public static final ConfigOption SERVER_SCANNER_EXPIRATION_INTERVAL = + key("server.scanner.expiration-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "How often the TTL reaper runs to close idle scanner sessions. " + + "The default value is 30 seconds."); + + public static final ConfigOption SERVER_SCANNER_MAX_PER_BUCKET = + key("server.scanner.max-per-bucket") + .intType() + .defaultValue(8) + .withDescription( + "Maximum number of concurrent scanner sessions per bucket. " + + "Exceeding this limit returns TOO_MANY_SCANNERS. " + + "The default value is 8."); + + public static final ConfigOption SERVER_SCANNER_MAX_PER_SERVER = + key("server.scanner.max-per-server") + .intType() + .defaultValue(200) + .withDescription( + "Maximum number of concurrent scanner sessions per tablet server. " + + "Exceeding this limit returns TOO_MANY_SCANNERS. " + + "The default value is 200."); + public static final ConfigOption TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES = key("tablet-server.controlled-shutdown.max-retries") .intType() @@ -1093,6 +1127,14 @@ public class ConfigOptions { + "will still be returned to ensure that the fetch can make progress. As such, " + "this is not a absolute maximum."); + public static final ConfigOption CLIENT_SCANNER_KV_FETCH_MAX_BYTES = + key("client.scanner.kv.fetch.max-bytes") + .memoryType() + .defaultValue(MemorySize.parse("1mb")) + .withDescription( + "The maximum amount of data the server should return for a kv scan request. " + + "The default value is 1mb."); + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET = key("client.scanner.log.fetch.max-bytes-for-bucket") .memoryType() diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 984a1def4a..b972a80ff8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -50,6 +50,11 @@ public TableConfig(Configuration config) { this.config = config; } + /** Gets the table properties configuration. */ + public Configuration getConfiguration() { + return config; + } + /** Gets the replication factor of the table. */ public int getReplicationFactor() { return config.get(ConfigOptions.TABLE_REPLICATION_FACTOR); diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/InvalidScanRequestException.java b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidScanRequestException.java new file mode 100644 index 0000000000..9c105f1b05 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidScanRequestException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** + * Thrown when a ScanKv request is malformed, for example when both {@code scanner_id} and {@code + * bucket_scan_req} are set simultaneously, or when neither is set, or when the {@code call_seq_id} + * is out of order. + */ +public class InvalidScanRequestException extends ApiException { + private static final long serialVersionUID = 1L; + + public InvalidScanRequestException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ScannerExpiredException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerExpiredException.java new file mode 100644 index 0000000000..efdb785113 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerExpiredException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** + * Thrown when a scanner session has expired due to TTL elapsing on the server. The client must + * restart the scan from the beginning. + */ +public class ScannerExpiredException extends ApiException { + private static final long serialVersionUID = 1L; + + public ScannerExpiredException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java new file mode 100644 index 0000000000..e146cb3b09 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when a scanner is not found. */ +public class ScannerNotFoundException extends ApiException { + private static final long serialVersionUID = 1L; + + public ScannerNotFoundException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/TooManyScannersException.java b/fluss-common/src/main/java/org/apache/fluss/exception/TooManyScannersException.java new file mode 100644 index 0000000000..4a791b1a98 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/TooManyScannersException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** + * Thrown when the per-bucket or per-server scanner session limit is reached. The client should + * back off and retry after a delay. + */ +public class TooManyScannersException extends ApiException { + private static final long serialVersionUID = 1L; + + public TooManyScannersException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/UnknownScannerIdException.java b/fluss-common/src/main/java/org/apache/fluss/exception/UnknownScannerIdException.java new file mode 100644 index 0000000000..a9817466de --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/UnknownScannerIdException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** + * Thrown when a scanner ID is not found on the server and the session was not TTL-expired (e.g., + * server restarted, leadership changed, or the session was explicitly closed). The client must + * restart the scan from the beginning. + */ +public class UnknownScannerIdException extends ApiException { + private static final long serialVersionUID = 1L; + + public UnknownScannerIdException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java index 578b74e5e2..932b415650 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java @@ -42,6 +42,8 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; @@ -130,6 +132,15 @@ CompletableFuture notifyLeaderAndIsr( @RPC(api = ApiKeys.LIMIT_SCAN) CompletableFuture limitScan(LimitScanRequest request); + /** + * Scan kv data from the specified table bucket. + * + * @param request the scan kv request + * @return the scan kv response + */ + @RPC(api = ApiKeys.SCAN_KV) + CompletableFuture scanKv(ScanKvRequest request); + /** * List offsets for the specified table bucket. * diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index cc033ba8a9..89cb14e96e 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -80,7 +80,8 @@ public enum ApiKeys { REBALANCE(1049, 0, 0, PUBLIC), LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), CANCEL_REBALANCE(1051, 0, 0, PUBLIC), - PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE); + PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE), + SCAN_KV(1053, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 5ee652cce2..f43e6cfa7a 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -38,6 +38,7 @@ import org.apache.fluss.exception.InvalidPartitionException; import org.apache.fluss.exception.InvalidReplicationFactorException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.exception.InvalidScanRequestException; import org.apache.fluss.exception.InvalidServerRackInfoException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.InvalidTargetColumnException; @@ -64,6 +65,8 @@ import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; +import org.apache.fluss.exception.ScannerExpiredException; +import org.apache.fluss.exception.ScannerNotFoundException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.SecurityTokenException; @@ -77,6 +80,8 @@ import org.apache.fluss.exception.TimeoutException; import org.apache.fluss.exception.TooManyBucketsException; import org.apache.fluss.exception.TooManyPartitionsException; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.exception.UnknownScannerIdException; import org.apache.fluss.exception.UnknownServerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.exception.UnknownWriterIdException; @@ -240,7 +245,22 @@ public enum Errors { SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new), REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new), NO_REBALANCE_IN_PROGRESS_EXCEPTION( - 62, "No rebalance task in progress.", NoRebalanceInProgressException::new); + 62, "No rebalance task in progress.", NoRebalanceInProgressException::new), + /** @deprecated Superseded by {@link #UNKNOWN_SCANNER_ID} (65) and {@link #SCANNER_EXPIRED} (64). */ + @Deprecated + SCANNER_NOT_FOUND_EXCEPTION(63, "The scanner is not found.", ScannerNotFoundException::new), + SCANNER_EXPIRED( + 64, + "The scanner session has expired due to inactivity.", + ScannerExpiredException::new), + UNKNOWN_SCANNER_ID( + 65, "The scanner id is not recognized by the server.", UnknownScannerIdException::new), + INVALID_SCAN_REQUEST( + 66, "The scan request is invalid.", InvalidScanRequestException::new), + TOO_MANY_SCANNERS( + 67, + "The per-bucket or per-server scanner session limit has been reached.", + TooManyScannersException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..12bb81386e 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -255,6 +255,71 @@ message PrefixLookupResponse { } +// scan kv request and response +message ScanKvRequest { + // If continuing an existing scan, then you must set scanner_id. + // Otherwise, you must set 'new_scan_request'. + optional bytes scanner_id = 1; + optional PbScanReqForBucket bucket_scan_req = 2; + + // The sequence ID of this call. The sequence ID should start at 0 + // with the request for a new scanner, and after each successful request, + // the client should increment it by 1. When retrying a request, the client + // should _not_ increment this value. If the server detects that the client + // missed a chunk of rows from the middle of a scan, it will respond with an + // error. + optional uint32 call_seq_id = 3; + + // The maximum number of bytes to send in the response. + optional uint32 batch_size_bytes = 4; + + // If set, the server will close the scanner after responding to + // this request, regardless of whether all rows have been delivered. + optional bool close_scanner = 5; +} + +message PbScanReqForBucket { + // The tablet to scan. + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + + // The maximum number of rows to scan with the new scanner. + // + // The scanner will automatically stop yielding results and close itself + // after reaching this number of result rows. + optional uint64 limit = 4; +} + +message ScanKvResponse { + // The error, if an error occurred with this request. + optional int32 error_code = 1; + optional string error_message = 2; + + // When a scanner is created, returns the scanner ID which may be used + // to pull new rows from the scanner. + optional bytes scanner_id = 3; + + // Set to true to indicate that there may be further results to be fetched + // from this scanner. If the scanner has no more results, then the scanner + // ID will become invalid and cannot continue to be used. + // + // Note that if a scan returns no results, then the initial response from + // the first RPC may return false in this flag, in which case there will + // be no scanner ID assigned. + optional bool has_more_results = 4; + + // The block of returned rows. + // + // NOTE: the schema-related fields will not be present in this row block. + // The schema will match the schema requested by the client when it created + // the scanner. + optional bytes records = 5; + + // Returns the corresponding log offset at the time the scanner is created + optional int64 log_offset = 6; +} + // limit scan request and response message LimitScanRequest { required int64 table_id = 2; diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 7db3654383..1bfada8c7f 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -71,6 +71,8 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -135,6 +137,11 @@ public CompletableFuture limitScan(LimitScanRequest request) return null; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + return null; + } + @Override public CompletableFuture listOffsets(ListOffsetsRequest request) { return null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java index f3998f4435..d7ab654151 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java @@ -32,6 +32,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; @@ -151,6 +152,18 @@ public List limitScan(Integer limit) { return pkList; } + public Snapshot getSnapshot() { + return db.getSnapshot(); + } + + public void releaseSnapshot(Snapshot snapshot) { + db.releaseSnapshot(snapshot); + } + + public RocksIterator newIterator(ReadOptions readOptions) { + return db.newIterator(defaultColumnFamilyHandle, readOptions); + } + public void put(byte[] key, byte[] value) throws IOException { try { db.put(writeOptions, key, value); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java new file mode 100644 index 0000000000..914a8faeef --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.server.utils.ResourceGuard; +import org.apache.fluss.utils.clock.Clock; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.nio.ByteBuffer; + +/** + * The context for a scanner session. Each instance holds a RocksDB snapshot and iterator for + * point-in-time scan isolation. The {@link ResourceGuard.Lease} prevents RocksDB from being closed + * while this session is active. + * + *

This class is not thread-safe; all fields must only be accessed by a single thread at a time. + */ +@NotThreadSafe +public class ScannerContext implements AutoCloseable { + private final byte[] scannerId; + /** Pre-computed map key to avoid per-RPC {@code ByteBuffer.wrap(scannerId)} allocation. */ + private final ByteBuffer scannerIdKey; + private final TableBucket tableBucket; + private final RocksDBKv rocksDBKv; + private final RocksIterator iterator; + private final ReadOptions readOptions; + private final Snapshot snapshot; + private final ResourceGuard.Lease resourceLease; + + private int callSeqId; + private long lastAccessTime; + + public ScannerContext( + byte[] scannerId, + TableBucket tableBucket, + RocksDBKv rocksDBKv, + RocksIterator iterator, + ReadOptions readOptions, + Snapshot snapshot, + ResourceGuard.Lease resourceLease, + Clock clock) { + this.scannerId = scannerId; + this.scannerIdKey = ByteBuffer.wrap(scannerId); + this.tableBucket = tableBucket; + this.rocksDBKv = rocksDBKv; + this.iterator = iterator; + this.readOptions = readOptions; + this.snapshot = snapshot; + this.resourceLease = resourceLease; + this.callSeqId = 0; + this.lastAccessTime = clock.milliseconds(); + } + + public byte[] getScannerId() { + return scannerId; + } + + public ByteBuffer getScannerIdKey() { + return scannerIdKey.duplicate(); + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public RocksIterator getIterator() { + return iterator; + } + + public int getCallSeqId() { + return callSeqId; + } + + public void setCallSeqId(int callSeqId) { + this.callSeqId = callSeqId; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public void updateLastAccessTime(long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + @Override + public void close() { + try { + iterator.close(); + } finally { + try { + readOptions.close(); + } finally { + try { + rocksDBKv.releaseSnapshot(snapshot); + } finally { + resourceLease.close(); + } + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java new file mode 100644 index 0000000000..36fb5f93cd --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.server.utils.ResourceGuard; +import org.apache.fluss.utils.AutoCloseableAsync; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FutureUtils; +import org.apache.fluss.utils.concurrent.Scheduler; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manages scanner sessions for KV snapshot queries. Each session holds a RocksDB snapshot and + * iterator. Sessions that are idle longer than the TTL are reaped automatically by a background + * task. + */ +public class ScannerManager implements AutoCloseableAsync { + private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class); + + private final Map scanners = MapUtils.newConcurrentHashMap(); + /** + * Tracks recently-expired scanner IDs and their expiry timestamp. Allows distinguishing between + * a scanner that expired (SCANNER_EXPIRED) and one that was never created (UNKNOWN_SCANNER_ID). + * Entries are pruned by the cleanup task after {@link #recentlyExpiredRetentionMs} to bound + * memory usage. + */ + private final Map recentlyExpiredIds = MapUtils.newConcurrentHashMap(); + private final Scheduler scheduler; + private final Clock clock; + private final long scannerTtlMs; + /** How long recently-expired IDs are retained for diagnostic error reporting (2 × TTL). */ + private final long recentlyExpiredRetentionMs; + private final int maxPerBucket; + private final int maxPerServer; + private final AtomicInteger totalScanners = new AtomicInteger(0); + private ScheduledFuture cleanupTask; + + public ScannerManager(Configuration conf, Scheduler scheduler) { + this(conf, scheduler, SystemClock.getInstance()); + } + + public ScannerManager(Configuration conf, Scheduler scheduler, Clock clock) { + this.scheduler = scheduler; + this.clock = clock; + this.scannerTtlMs = conf.get(ConfigOptions.SERVER_SCANNER_TTL).toMillis(); + this.recentlyExpiredRetentionMs = 2 * scannerTtlMs; + long expirationIntervalMs = + conf.get(ConfigOptions.SERVER_SCANNER_EXPIRATION_INTERVAL).toMillis(); + this.maxPerBucket = conf.get(ConfigOptions.SERVER_SCANNER_MAX_PER_BUCKET); + this.maxPerServer = conf.get(ConfigOptions.SERVER_SCANNER_MAX_PER_SERVER); + this.cleanupTask = + scheduler.schedule( + "scanner-expiration", + this::cleanupExpiredScanners, + expirationIntervalMs, + expirationIntervalMs); + } + + /** + * Creates a new scanner session for the given KV tablet and returns the {@link ScannerContext}, + * or {@code null} if the bucket is empty (no rows to scan). + * + *

When {@code null} is returned, no scanner session is registered and no limit slot is + * consumed. The caller should return an empty response immediately. + * + *

Note on limit enforcement: The per-bucket and server-level limit checks are not + * atomic with the subsequent {@code scanners.put()}. This is an intentional trade-off: the + * limits are soft guards against runaway scanner creation, not hard resource quotas. In the + * rare race where two threads both pass the limit check before either registers, the server + * may briefly exceed the configured maximum by a small amount. + * + * @throws TooManyScannersException if per-bucket or per-server limits are exceeded + * @throws IOException if the RocksDB resource guard cannot be acquired + */ + @Nullable + public ScannerContext createScanner(KvTablet kvTablet, TableBucket tableBucket) + throws IOException { + // Check server-level limit first (cheap atomic check) + if (totalScanners.get() >= maxPerServer) { + throw new TooManyScannersException( + "Server scanner limit (" + + maxPerServer + + ") reached. Try again later."); + } + + // Check per-bucket limit + int bucketCount = countScannersForBucket(tableBucket); + if (bucketCount >= maxPerBucket) { + throw new TooManyScannersException( + "Per-bucket scanner limit (" + + maxPerBucket + + ") reached for bucket " + + tableBucket + + ". Try again later."); + } + + RocksDBKv rocksDBKv = kvTablet.getRocksDBKv(); + // Acquire a lease to prevent RocksDB from being closed while iterating. + // All subsequent resource allocations are guarded by a try/catch so the lease + // is always released on failure. + ResourceGuard.Lease lease = rocksDBKv.getResourceGuard().acquireResource(); + try { + Snapshot snapshot = rocksDBKv.getSnapshot(); + ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot); + RocksIterator iterator = rocksDBKv.newIterator(readOptions); + iterator.seekToFirst(); + + if (!iterator.isValid()) { + // Empty bucket: release all resources without registering a scanner session. + iterator.close(); + readOptions.close(); + rocksDBKv.releaseSnapshot(snapshot); + lease.close(); + return null; + } + + byte[] scannerId = generateScannerId(); + ScannerContext context = + new ScannerContext( + scannerId, + tableBucket, + rocksDBKv, + iterator, + readOptions, + snapshot, + lease, + clock); + scanners.put(context.getScannerIdKey(), context); + totalScanners.incrementAndGet(); + return context; + } catch (Exception e) { + lease.close(); + throw e; + } + } + + /** + * Looks up a scanner by id and refreshes its last-access timestamp. Returns {@code null} if + * the scanner is not registered (was never created or has already been removed). + */ + @Nullable + public ScannerContext getScanner(byte[] scannerId) { + ScannerContext context = scanners.get(ByteBuffer.wrap(scannerId)); + if (context != null) { + context.updateLastAccessTime(clock.milliseconds()); + } + return context; + } + + /** + * Returns {@code true} if the scanner with the given id was recently expired by the TTL reaper. + * This allows callers to distinguish a SCANNER_EXPIRED error from an UNKNOWN_SCANNER_ID error + * when {@link #getScanner(byte[])} returns {@code null}. + */ + public boolean isRecentlyExpired(byte[] scannerId) { + return recentlyExpiredIds.containsKey(ByteBuffer.wrap(scannerId)); + } + + /** + * Removes and closes the scanner identified by {@code scannerId}. No-op if the id is unknown. + */ + public void removeScanner(byte[] scannerId) { + ScannerContext context = scanners.remove(ByteBuffer.wrap(scannerId)); + if (context != null) { + totalScanners.decrementAndGet(); + closeScannerContext(context); + } + } + + /** + * Removes and closes a known scanner context directly, avoiding a map lookup. Prefer this over + * {@link #removeScanner(byte[])} when the caller already holds the {@link ScannerContext}. + */ + public void removeScanner(ScannerContext context) { + if (scanners.remove(context.getScannerIdKey(), context)) { + totalScanners.decrementAndGet(); + closeScannerContext(context); + } + } + + /** + * Closes and removes all active scanner sessions for the given bucket. Called when a bucket + * loses leadership to prevent stale RocksDB snapshot/iterator leaks. + */ + public void closeScannersForBucket(TableBucket tableBucket) { + List toRemove = new ArrayList<>(); + for (Map.Entry entry : scanners.entrySet()) { + if (tableBucket.equals(entry.getValue().getTableBucket())) { + toRemove.add(entry.getKey()); + } + } + for (ByteBuffer key : toRemove) { + ScannerContext context = scanners.remove(key); + if (context != null) { + totalScanners.decrementAndGet(); + LOG.info( + "Closing scanner {} for bucket {} due to leadership change.", + scannerIdToString(key.array()), + tableBucket); + closeScannerContext(context); + } + } + } + + /** + * Counts active scanners for a given bucket. O(n) over total scanners — acceptable because + * {@code maxPerServer} is small (default 200) and this path is only on scanner creation. + */ + private int countScannersForBucket(TableBucket tableBucket) { + int count = 0; + for (ScannerContext ctx : scanners.values()) { + if (tableBucket.equals(ctx.getTableBucket())) { + count++; + } + } + return count; + } + + private void cleanupExpiredScanners() { + long now = clock.milliseconds(); + + // Prune stale entries from the recently-expired set to bound memory usage + recentlyExpiredIds.entrySet().removeIf(e -> now - e.getValue() > recentlyExpiredRetentionMs); + + for (Map.Entry entry : scanners.entrySet()) { + ScannerContext context = entry.getValue(); + if (now - context.getLastAccessTime() > scannerTtlMs) { + // Atomic conditional remove to avoid double-close race with removeScanner() + if (scanners.remove(entry.getKey(), context)) { + totalScanners.decrementAndGet(); + // Record the expiry so subsequent lookups can return SCANNER_EXPIRED + recentlyExpiredIds.put(entry.getKey(), now); + LOG.info( + "Scanner {} expired after {}ms idle, closing it.", + scannerIdToString(entry.getKey().array()), + scannerTtlMs); + closeScannerContext(context); + } + } + } + } + + private void closeScannerContext(ScannerContext context) { + try { + context.close(); + } catch (Exception e) { + LOG.error("Failed to close scanner context.", e); + } + } + + private byte[] generateScannerId() { + return UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8); + } + + private String scannerIdToString(byte[] scannerId) { + return new String(scannerId, StandardCharsets.UTF_8); + } + + @Override + public CompletableFuture closeAsync() { + try { + close(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + + @Override + public void close() throws Exception { + if (cleanupTask != null) { + cleanupTask.cancel(false); + } + // Use the same conditional-remove pattern as the cleanup task so that each ScannerContext + // is closed by exactly one caller even if the cleanup task is concurrently running. + for (Map.Entry entry : scanners.entrySet()) { + if (scanners.remove(entry.getKey(), entry.getValue())) { + totalScanners.decrementAndGet(); + closeScannerContext(entry.getValue()); + } + } + recentlyExpiredIds.clear(); + totalScanners.set(0); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 8eed63c844..bf6b3210cc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -41,6 +41,7 @@ import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.remote.RemoteLogManager; @@ -125,6 +126,9 @@ public class TabletServer extends ServerBase { @GuardedBy("lock") private TabletService tabletService; + @GuardedBy("lock") + private ScannerManager scannerManager; + @GuardedBy("lock") private MetricRegistry metricRegistry; @@ -230,6 +234,8 @@ protected void startServices() throws Exception { this.kvManager = KvManager.create(conf, zkClient, logManager, tabletServerMetricGroup); kvManager.startup(); + this.scannerManager = new ScannerManager(conf, scheduler, clock); + // Register kvManager to dynamicConfigManager for dynamic reconfiguration dynamicConfigManager.register(kvManager); // Start dynamicConfigManager after all reconfigurable components are registered @@ -286,6 +292,7 @@ protected void startServices() throws Exception { metadataManager, authorizer, dynamicConfigManager, + scannerManager, ioExecutor); RequestsMetrics requestsMetrics = @@ -428,6 +435,12 @@ CompletableFuture stopServices() { scheduler.shutdown(); } + // Close scanner sessions before shutting down kvManager so that + // RocksDB snapshots are released while RocksDB is still open. + if (scannerManager != null) { + scannerManager.close(); + } + if (kvManager != null) { kvManager.shutdown(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 992b963334..507abd34ba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -19,10 +19,13 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.NonPrimaryKeyTableException; +import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; @@ -50,12 +53,15 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.PrefixLookupResponse; import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; @@ -70,17 +76,25 @@ import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; +import org.apache.fluss.server.entity.StopReplicaData; import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.kv.scan.ScannerContext; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metadata.TabletServerMetadataProvider; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.rocksdb.RocksIterator; + import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -124,9 +138,11 @@ public final class TabletService extends RpcServiceBase implements TabletServerGateway { private final String serviceName; + private final int serverId; private final ReplicaManager replicaManager; private final TabletServerMetadataCache metadataCache; private final TabletServerMetadataProvider metadataFunctionProvider; + private final ScannerManager scannerManager; public TabletService( int serverId, @@ -137,6 +153,7 @@ public TabletService( MetadataManager metadataManager, @Nullable Authorizer authorizer, DynamicConfigManager dynamicConfigManager, + ScannerManager scannerManager, ExecutorService ioExecutor) { super( remoteFileSystem, @@ -147,8 +164,10 @@ public TabletService( dynamicConfigManager, ioExecutor); this.serviceName = "server-" + serverId; + this.serverId = serverId; this.replicaManager = replicaManager; this.metadataCache = metadataCache; + this.scannerManager = scannerManager; this.metadataFunctionProvider = new TabletServerMetadataProvider(zkClient, metadataManager, metadataCache); } @@ -285,12 +304,161 @@ public CompletableFuture limitScan(LimitScanRequest request) return response; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + CompletableFuture response = new CompletableFuture<>(); + try { + boolean hasScannerId = request.hasScannerId(); + boolean hasBucketScanReq = request.hasBucketScanReq(); + + if (hasScannerId && hasBucketScanReq) { + throw Errors.INVALID_SCAN_REQUEST.exception( + "ScanKv request must provide either a scanner_id or a bucket_scan_req, not both."); + } + + if (hasScannerId) { + byte[] scannerId = request.getScannerId(); + ScannerContext context = scannerManager.getScanner(scannerId); + if (context == null) { + String idStr = new String(scannerId, StandardCharsets.UTF_8); + if (scannerManager.isRecentlyExpired(scannerId)) { + throw Errors.SCANNER_EXPIRED.exception( + "Scanner session expired: " + idStr); + } + throw Errors.UNKNOWN_SCANNER_ID.exception( + "Unknown scanner id: " + idStr); + } + + if (request.hasCloseScanner() && request.isCloseScanner()) { + scannerManager.removeScanner(context); + ScanKvResponse scanResponse = new ScanKvResponse(); + scanResponse.setScannerId(scannerId).setHasMoreResults(false); + response.complete(scanResponse); + return response; + } + + // Validate call sequence to detect reordered or duplicate requests. + // Update callSeqId only after continueScan() succeeds so that clients can safely + // retry with the same callSeqId if the server returns an error response. + int expectedSeq = context.getCallSeqId() + 1; + if (request.getCallSeqId() != expectedSeq) { + throw Errors.INVALID_SCAN_REQUEST.exception( + "Out of order scan request. Expected call_seq_id: " + + expectedSeq + + ", but got: " + + request.getCallSeqId()); + } + ScanKvResponse scanResponse = continueScan(context, request.getBatchSizeBytes()); + context.setCallSeqId(request.getCallSeqId()); + response.complete(scanResponse); + } else { + if (!hasBucketScanReq) { + throw Errors.INVALID_SCAN_REQUEST.exception( + "ScanKv request must provide either a scanner_id or a bucket_scan_req."); + } + PbScanReqForBucket bucketScanReq = request.getBucketScanReq(); + authorizeTable(READ, bucketScanReq.getTableId()); + + TableBucket tb = + new TableBucket( + bucketScanReq.getTableId(), + bucketScanReq.hasPartitionId() + ? bucketScanReq.getPartitionId() + : null, + bucketScanReq.getBucketId()); + Replica replica = replicaManager.getReplicaOrException(tb); + if (!replica.isLeader()) { + throw new NotLeaderOrFollowerException("Leader not local for bucket " + tb); + } + if (!replica.isKvTable()) { + throw new NonPrimaryKeyTableException( + "Table " + bucketScanReq.getTableId() + " is not a primary key table."); + } + + // Capture log offset before opening the snapshot so clients get a lower bound + // on the WAL position consistent with the data they are about to read. + long logHighWatermark = replica.getLogHighWatermark(); + ScannerContext context = scannerManager.createScanner(replica.getKvTablet(), tb); + + if (context == null) { + // Empty bucket: no rows, no scanner registered + ScanKvResponse scanResponse = new ScanKvResponse(); + scanResponse.setHasMoreResults(false).setLogOffset(logHighWatermark); + response.complete(scanResponse); + return response; + } + + ScanKvResponse scanResponse = continueScan(context, request.getBatchSizeBytes()); + // Return the log offset for clients to determine where to resume reading the log + // after the snapshot scan completes. + scanResponse.setLogOffset(logHighWatermark); + response.complete(scanResponse); + } + } catch (Exception e) { + response.complete(makeScanKvErrorResponse(e)); + } + return response; + } + + private ScanKvResponse continueScan(ScannerContext context, int batchSizeBytes) + throws IOException { + final byte[] scannerId = context.getScannerId(); + RocksIterator iterator = context.getIterator(); + DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder(); + int currentBytes = 0; + int rowsInBatch = 0; + + while (iterator.isValid()) { + byte[] value = iterator.value(); + // Always include at least one record; stop if the batch would exceed the size limit + if (rowsInBatch > 0 && currentBytes + value.length > batchSizeBytes) { + break; + } + + builder.append(value); + currentBytes += value.length; + rowsInBatch++; + iterator.next(); + } + + boolean hasMore = iterator.isValid(); + + ScanKvResponse response = new ScanKvResponse(); + response.setScannerId(scannerId).setHasMoreResults(hasMore); + if (rowsInBatch > 0) { + DefaultValueRecordBatch batch = builder.build(); + byte[] records = new byte[batch.sizeInBytes()]; + batch.getSegment().get(0, records); + response.setRecords(records); + } + + if (!hasMore) { + scannerManager.removeScanner(context); + } + + return response; + } + + private ScanKvResponse makeScanKvErrorResponse(Throwable e) { + ScanKvResponse response = new ScanKvResponse(); + ApiError error = ApiError.fromThrowable(e); + response.setErrorCode(error.error().code()).setErrorMessage(error.message()); + return response; + } + @Override public CompletableFuture notifyLeaderAndIsr( NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) { CompletableFuture response = new CompletableFuture<>(); List notifyLeaderAndIsrRequestData = getNotifyLeaderAndIsrRequestData(notifyLeaderAndIsrRequest); + // Close scanner sessions only for buckets where this server is becoming a follower. + // Buckets where this server remains or becomes the leader keep their snapshots valid. + for (NotifyLeaderAndIsrData data : notifyLeaderAndIsrRequestData) { + if (data.getLeader() != serverId) { + scannerManager.closeScannersForBucket(data.getTableBucket()); + } + } replicaManager.becomeLeaderOrFollower( notifyLeaderAndIsrRequest.getCoordinatorEpoch(), notifyLeaderAndIsrRequestData, @@ -326,9 +494,14 @@ public CompletableFuture updateMetadata(UpdateMetadataRe public CompletableFuture stopReplica( StopReplicaRequest stopReplicaRequest) { CompletableFuture response = new CompletableFuture<>(); + List stopReplicaData = getStopReplicaData(stopReplicaRequest); + // Close any active scanner sessions for all buckets being stopped to avoid leaks. + for (StopReplicaData data : stopReplicaData) { + scannerManager.closeScannersForBucket(data.getTableBucket()); + } replicaManager.stopReplicas( stopReplicaRequest.getCoordinatorEpoch(), - getStopReplicaData(stopReplicaRequest), + stopReplicaData, result -> response.complete(makeStopReplicaResponse(result))); return response; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 500d197fcf..dc7e15dccc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -81,6 +81,8 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -204,6 +206,11 @@ public CompletableFuture limitScan(LimitScanRequest request) return null; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + return null; + } + @Override public CompletableFuture listOffsets(ListOffsetsRequest request) { return null;