NOTE: returning empty records guarantees the consumed position are NOT updated.
+ *
+ * @return The fetched records per partition
+ * @throws FetchException If there is OffsetOutOfRange error in fetchResponse and the
+ * defaultResetPolicy is NONE
+ */
+ public R collectFetch(final LogFetchBuffer logFetchBuffer) {
+ Map NOTE: returning empty records guarantees the consumed position are NOT updated.
- *
- * @return The fetched records per partition
- * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
- * the defaultResetPolicy is NONE
- */
- public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
- Map This method is intentionally kept off the public {@link Scan} API surface for now.
+ */
+ @Internal
+ public ArrowScanRecords pollRecordBatch(Duration timeout) {
+ if (!arrowLogFormat) {
+ throw new UnsupportedOperationException(
+ "Arrow record batch polling is only supported for tables whose log format is ARROW.");
+ }
+ if (!isLogTable) {
+ throw new UnsupportedOperationException(
+ "Arrow record batch polling is only supported for log tables. CDC scanning is not supported.");
}
+ if (hasProjection) {
+ throw new UnsupportedOperationException(
+ "Arrow record batch polling does not support projection. Please create the scanner without projection.");
+ }
+
+ return doPoll(
+ timeout,
+ this::pollForRecordBatches,
+ ArrowScanRecords::isEmpty,
+ () -> ArrowScanRecords.EMPTY);
}
@Override
@@ -178,8 +180,8 @@ public void subscribe(int bucket, long offset) {
acquireAndEnsureOpen();
try {
TableBucket tableBucket = new TableBucket(tableId, bucket);
- this.metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath));
- this.logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset));
+ metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath));
+ logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset));
} finally {
release();
}
@@ -199,9 +201,9 @@ public void subscribe(long partitionId, int bucket, long offset) {
// we make assumption that the partition id must belong to the current table
// if we can't find the partition id from the table path, we'll consider the table
// is not exist
- this.metadataUpdater.checkAndUpdatePartitionMetadata(
+ metadataUpdater.checkAndUpdatePartitionMetadata(
tablePath, Collections.singleton(partitionId));
- this.logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset));
+ logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset));
} finally {
release();
}
@@ -216,7 +218,7 @@ public void unsubscribe(long partitionId, int bucket) {
acquireAndEnsureOpen();
try {
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
- this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
+ logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
} finally {
release();
}
@@ -233,7 +235,7 @@ public void unsubscribe(int bucket) {
acquireAndEnsureOpen();
try {
TableBucket tableBucket = new TableBucket(tableId, bucket);
- this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
+ logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
} finally {
release();
}
@@ -252,12 +254,66 @@ private ScanRecords pollForFetches() {
// send any new fetches (won't resend pending fetches).
logFetcher.sendFetches();
-
return logFetcher.collectFetch();
}
+ private ArrowScanRecords pollForRecordBatches() {
+ ArrowScanRecords scanRecords = logFetcher.collectArrowFetch();
+ if (!scanRecords.isEmpty()) {
+ return scanRecords;
+ }
+
+ // send any new fetches (won't resend pending fetches).
+ logFetcher.sendFetches();
+ return logFetcher.collectArrowFetch();
+ }
+
+ /** Shared polling loop for row and Arrow scan results. */
+ private
This class only supports append-only log tables. CDC tables are not supported. + * + *
The caller must close this object after use in order to release the underlying Arrow memory.
+ */
+@Internal
+public class ArrowBatchData implements AutoCloseable {
+
+ private final VectorSchemaRoot vectorSchemaRoot;
+ private final BufferAllocator allocator;
+ private final long baseLogOffset;
+ private final long timestamp;
+ private final int schemaId;
+
+ public ArrowBatchData(
+ VectorSchemaRoot vectorSchemaRoot,
+ BufferAllocator allocator,
+ long baseLogOffset,
+ long timestamp,
+ int schemaId) {
+ this.vectorSchemaRoot = checkNotNull(vectorSchemaRoot, "vectorSchemaRoot must not be null");
+ this.allocator = checkNotNull(allocator, "allocator must not be null");
+ this.baseLogOffset = baseLogOffset;
+ this.timestamp = timestamp;
+ this.schemaId = schemaId;
+ }
+
+ /** Returns the Arrow vectors of this batch. */
+ public VectorSchemaRoot getVectorSchemaRoot() {
+ return vectorSchemaRoot;
+ }
+
+ /** Returns the schema id of this batch. */
+ public int getSchemaId() {
+ return schemaId;
+ }
+
+ /** Returns the base log offset of this batch. */
+ public long getBaseLogOffset() {
+ return baseLogOffset;
+ }
+
+ /** Returns the commit timestamp of this batch. */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ /** Returns the number of rows in this batch. */
+ public int getRecordCount() {
+ return vectorSchemaRoot.getRowCount();
+ }
+
+ /** Returns the log offset of the given row. */
+ public long getLogOffset(int rowId) {
+ validateRowId(rowId);
+ return baseLogOffset + rowId;
+ }
+
+ private void validateRowId(int rowId) {
+ checkArgument(
+ rowId >= 0 && rowId < getRecordCount(),
+ "rowId must be in [0, %s), but is %s",
+ getRecordCount(),
+ rowId);
+ }
+
+ @Override
+ public void close() {
+ try {
+ vectorSchemaRoot.close();
+ } finally {
+ allocator.close();
+ }
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java b/fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java
new file mode 100644
index 0000000000..64cf19ba42
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.memory.MemorySegment;
+
+/** Internal context for loading Arrow record batches with unshaded Arrow resources. */
+@Internal
+interface ArrowRecordBatchContext extends LogRecordBatch.ReadContext {
+
+ /** Creates a batch-scoped access wrapper for unshaded Arrow resources. */
+ UnshadedArrowBatchAccess createUnshadedArrowBatchAccess(int schemaId);
+
+ /** Internal wrapper that hides unshaded Arrow types from shared signatures. */
+ @Internal
+ interface UnshadedArrowBatchAccess extends AutoCloseable {
+
+ /** Loads one Arrow batch from the given memory segment into the internal read root. */
+ void loadArrowBatch(MemorySegment segment, int arrowOffset, int arrowLength);
+
+ /**
+ * Applies schema-evolution projection (if needed) internally and builds the final {@link
+ * ArrowBatchData}, transferring ownership to the caller.
+ */
+ ArrowBatchData createArrowBatchData(long baseLogOffset, long timestamp, int schemaId);
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index 4d9c7fd040..692cf0c9ac 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -31,6 +31,7 @@
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ArrowUtils;
import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.MurmurHashUtils;
import org.apache.fluss.utils.crc.Crc32C;
@@ -62,6 +63,7 @@
import static org.apache.fluss.record.LogRecordBatchFormat.statisticsDataOffset;
import static org.apache.fluss.record.LogRecordBatchFormat.statisticsLengthOffset;
import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -262,6 +264,38 @@ public CloseableIterator This method is only supported for Arrow log batches.
+ */
+ default ArrowBatchData loadArrowBatch(ReadContext context) {
+ throw new UnsupportedOperationException(
+ "loadArrowBatch is only supported for ARROW log format.");
+ }
+
/** The read context of a {@link LogRecordBatch} to read records. */
interface ReadContext {
diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 2b0f695ecf..42324edd5f 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -18,6 +18,7 @@
package org.apache.fluss.record;
import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaGetter;
@@ -31,8 +32,10 @@
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ArrowUtils;
+import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.Projection;
+import org.apache.fluss.utils.UnshadedArrowReadUtils;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -41,11 +44,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-
/** A simple implementation for {@link LogRecordBatch.ReadContext}. */
@ThreadSafe
-public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoCloseable {
+public class LogRecordReadContext
+ implements LogRecordBatch.ReadContext, ArrowRecordBatchContext, AutoCloseable {
// the log format of the table
private final LogFormat logFormat;
@@ -54,14 +56,16 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo
// the static schemaId of the table, should support dynamic schema evolution in the future
private final int targetSchemaId;
// the Arrow memory buffer allocator for the table, should be null if not ARROW log format
- @Nullable private final BufferAllocator bufferAllocator;
- // the final selected fields of the read data
+ @Nullable private volatile BufferAllocator bufferAllocator;
+ @Nullable private volatile AutoCloseable unshadedBufferAllocator;
private final FieldGetter[] selectedFieldGetters;
// whether the projection is push downed to the server side and the returned data is pruned.
private final boolean projectionPushDowned;
private final SchemaGetter schemaGetter;
private final ConcurrentHashMap Each entry maps an output column to its position in the batch's schema. A value of {@code
+ * -1} means the column does not exist in the batch's schema and should be filled with nulls.
+ */
+ @Nullable
+ private int[] getSchemaEvolutionMapping(int schemaId) {
+ ProjectedRow projectedRow = getOutputProjectedRow(schemaId);
+ if (projectedRow == null) {
+ return null;
+ }
+ return projectedRow.getIndexMapping();
+ }
+
@Override
public VectorSchemaRoot getVectorSchemaRoot(int schemaId) {
if (logFormat != LogFormat.ARROW) {
@@ -254,12 +288,19 @@ public VectorSchemaRoot getVectorSchemaRoot(int schemaId) {
"Only Arrow log format provides vector schema root.");
}
- RowType rowType = getRowType(schemaId);
- return vectorSchemaRootMap.computeIfAbsent(
- schemaId,
- (id) ->
- VectorSchemaRoot.create(
- ArrowUtils.toArrowSchema(rowType), bufferAllocator));
+ synchronized (arrowResourceLock) {
+ VectorSchemaRoot vectorSchemaRoot = vectorSchemaRootMap.get(schemaId);
+ if (vectorSchemaRoot != null) {
+ return vectorSchemaRoot;
+ }
+
+ RowType rowType = getRowType(schemaId);
+ vectorSchemaRoot =
+ VectorSchemaRoot.create(
+ ArrowUtils.toArrowSchema(rowType), getOrCreateBufferAllocator());
+ vectorSchemaRootMap.put(schemaId, vectorSchemaRoot);
+ return vectorSchemaRoot;
+ }
}
@Override
@@ -267,8 +308,16 @@ public BufferAllocator getBufferAllocator() {
if (logFormat != LogFormat.ARROW) {
throw new IllegalArgumentException("Only Arrow log format provides buffer allocator.");
}
- checkNotNull(bufferAllocator, "The buffer allocator is not available.");
- return bufferAllocator;
+ return getOrCreateBufferAllocator();
+ }
+
+ @Override
+ public UnshadedArrowBatchAccess createUnshadedArrowBatchAccess(int schemaId) {
+ if (logFormat != LogFormat.ARROW) {
+ throw new IllegalArgumentException(
+ "Only Arrow log format provides unshaded Arrow resources.");
+ }
+ return new UnshadedArrowBatchAccessImpl(schemaId);
}
@Nullable
@@ -284,9 +333,104 @@ public ProjectedRow getOutputProjectedRow(int schemaId) {
}
public void close() {
- vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close);
- if (bufferAllocator != null) {
- bufferAllocator.close();
+ synchronized (arrowResourceLock) {
+ vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close);
+ vectorSchemaRootMap.clear();
+ if (bufferAllocator != null) {
+ bufferAllocator.close();
+ bufferAllocator = null;
+ }
+ }
+
+ synchronized (unshadedArrowResourceLock) {
+ IOUtils.closeQuietly(unshadedBufferAllocator);
+ unshadedBufferAllocator = null;
+ }
+ }
+
+ private BufferAllocator getOrCreateBufferAllocator() {
+ BufferAllocator allocator = bufferAllocator;
+ if (allocator != null) {
+ return allocator;
+ }
+
+ synchronized (arrowResourceLock) {
+ if (bufferAllocator == null) {
+ bufferAllocator = new RootAllocator(Long.MAX_VALUE);
+ }
+ return bufferAllocator;
+ }
+ }
+
+ private AutoCloseable getOrCreateUnshadedBufferAllocator() {
+ AutoCloseable allocator = unshadedBufferAllocator;
+ if (allocator != null) {
+ return allocator;
+ }
+
+ synchronized (unshadedArrowResourceLock) {
+ if (unshadedBufferAllocator == null) {
+ unshadedBufferAllocator = new org.apache.arrow.memory.RootAllocator(Long.MAX_VALUE);
+ }
+ return unshadedBufferAllocator;
+ }
+ }
+
+ private final class UnshadedArrowBatchAccessImpl implements UnshadedArrowBatchAccess {
+ private final org.apache.arrow.memory.BufferAllocator allocator;
+ private org.apache.arrow.vector.VectorSchemaRoot readRoot;
+ private org.apache.arrow.vector.VectorSchemaRoot outputRoot;
+ private boolean ownershipTransferred;
+
+ private UnshadedArrowBatchAccessImpl(int schemaId) {
+ org.apache.arrow.memory.BufferAllocator parentAllocator =
+ (org.apache.arrow.memory.BufferAllocator) getOrCreateUnshadedBufferAllocator();
+ this.allocator =
+ parentAllocator.newChildAllocator("log-record-read-batch", 0, Long.MAX_VALUE);
+ this.readRoot =
+ org.apache.arrow.vector.VectorSchemaRoot.create(
+ org.apache.fluss.utils.UnshadedArrowReadUtils.toArrowSchema(
+ getRowType(schemaId)),
+ allocator);
+ this.outputRoot = readRoot;
+ }
+
+ @Override
+ public void loadArrowBatch(MemorySegment segment, int arrowOffset, int arrowLength) {
+ UnshadedArrowReadUtils.loadArrowBatch(
+ segment, arrowOffset, arrowLength, readRoot, allocator);
+ }
+
+ @Override
+ public ArrowBatchData createArrowBatchData(
+ long baseLogOffset, long timestamp, int schemaId) {
+ int[] schemaMapping = getSchemaEvolutionMapping(schemaId);
+ if (schemaMapping != null) {
+ outputRoot =
+ UnshadedArrowReadUtils.projectVectorSchemaRoot(
+ readRoot, dataRowType, schemaMapping, allocator);
+ readRoot.close();
+ readRoot = null;
+ }
+ ArrowBatchData arrowBatchData =
+ new ArrowBatchData(outputRoot, allocator, baseLogOffset, timestamp, schemaId);
+ ownershipTransferred = true;
+ outputRoot = null;
+ readRoot = null;
+ return arrowBatchData;
+ }
+
+ @Override
+ public void close() {
+ if (ownershipTransferred) {
+ return;
+ }
+
+ IOUtils.closeQuietly(outputRoot);
+ if (outputRoot != readRoot) {
+ IOUtils.closeQuietly(readRoot);
+ }
+ IOUtils.closeQuietly(allocator);
}
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java b/fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java
new file mode 100644
index 0000000000..7798f8ef6e
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.util.Collections2;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.TypeLayout;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.CompressionUtil.CodecType;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Unshaded variant of {@link FlussVectorLoader} for scanner/read path. */
+public class UnshadedFlussVectorLoader {
+ private final VectorSchemaRoot root;
+ private final CompressionCodec.Factory factory;
+ private boolean decompressionNeeded;
+
+ public UnshadedFlussVectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) {
+ this.root = root;
+ this.factory = factory;
+ }
+
+ public void load(ArrowRecordBatch recordBatch) {
+ Iterator