From 433fd85ec5b1e338cbc9285d2a047f020ec9af30 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 2 Apr 2026 17:17:46 +0800 Subject: [PATCH] [client] Support log scanner scan to arrow record batch --- fluss-client/pom.xml | 14 + .../log/AbstractLogFetchCollector.java | 240 ++++++++++++++++++ .../scanner/log/ArrowLogFetchCollector.java | 108 ++++++++ .../table/scanner/log/ArrowScanRecords.java | 103 ++++++++ .../table/scanner/log/CompletedFetch.java | 104 ++++++-- .../table/scanner/log/LogFetchCollector.java | 200 +-------------- .../client/table/scanner/log/LogFetcher.java | 7 + .../table/scanner/log/LogScannerImpl.java | 158 ++++++++---- .../table/scanner/log/LogScannerITCase.java | 85 +++++++ fluss-common/pom.xml | 14 + .../UnshadedArrowCompressionFactory.java | 60 +++++ .../UnshadedLz4ArrowCompressionCodec.java | 93 +++++++ .../UnshadedZstdArrowCompressionCodec.java | 112 ++++++++ .../apache/fluss/record/ArrowBatchData.java | 104 ++++++++ .../fluss/record/ArrowRecordBatchContext.java | 43 ++++ .../fluss/record/DefaultLogRecordBatch.java | 40 ++- .../fluss/record/FileLogInputStream.java | 5 + .../apache/fluss/record/LogRecordBatch.java | 10 + .../fluss/record/LogRecordReadContext.java | 186 ++++++++++++-- .../record/UnshadedFlussVectorLoader.java | 132 ++++++++++ .../org/apache/fluss/row/ProjectedRow.java | 10 +- .../fluss/utils/UnshadedArrowReadUtils.java | 126 +++++++++ .../MemoryLogRecordsArrowBuilderTest.java | 8 + 23 files changed, 1679 insertions(+), 283 deletions(-) create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogFetchCollector.java create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/compression/UnshadedArrowCompressionFactory.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/compression/UnshadedLz4ArrowCompressionCodec.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/compression/UnshadedZstdArrowCompressionCodec.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java diff --git a/fluss-client/pom.xml b/fluss-client/pom.xml index 0f3f5a5342..3b612f7a83 100644 --- a/fluss-client/pom.xml +++ b/fluss-client/pom.xml @@ -49,6 +49,20 @@ ${project.version} + + org.apache.arrow + arrow-vector + ${arrow.version} + provided + + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + provided + + org.apache.fluss diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogFetchCollector.java new file mode 100644 index 0000000000..cbcece6ef1 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogFetchCollector.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.FetchException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Shared implementation for polling completed fetches into scanner results. */ +@ThreadSafe +@Internal +abstract class AbstractLogFetchCollector { + protected final Logger log; + protected final TablePath tablePath; + protected final LogScannerStatus logScannerStatus; + private final int maxPollRecords; + private final MetadataUpdater metadataUpdater; + + protected AbstractLogFetchCollector( + Logger log, + TablePath tablePath, + LogScannerStatus logScannerStatus, + Configuration conf, + MetadataUpdater metadataUpdater) { + this.log = log; + this.tablePath = tablePath; + this.logScannerStatus = logScannerStatus; + this.maxPollRecords = conf.getInt(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS); + this.metadataUpdater = metadataUpdater; + } + + /** + * Return the fetched log records, empty the record buffer and update the consumed position. + * + *

NOTE: returning empty records guarantees the consumed position are NOT updated. + * + * @return The fetched records per partition + * @throws FetchException If there is OffsetOutOfRange error in fetchResponse and the + * defaultResetPolicy is NONE + */ + public R collectFetch(final LogFetchBuffer logFetchBuffer) { + Map> fetched = new HashMap<>(); + int recordsRemaining = maxPollRecords; + + try { + while (recordsRemaining > 0) { + CompletedFetch nextInLineFetch = logFetchBuffer.nextInLineFetch(); + if (nextInLineFetch == null || nextInLineFetch.isConsumed()) { + CompletedFetch completedFetch = logFetchBuffer.peek(); + if (completedFetch == null) { + break; + } + + if (!completedFetch.isInitialized()) { + try { + logFetchBuffer.setNextInLineFetch(initialize(completedFetch)); + } catch (Exception e) { + // Remove a completedFetch upon a parse with exception if + // (1) it contains no records, and + // (2) there are no fetched records with actual content + // preceding this exception. + if (fetched.isEmpty() && completedFetch.sizeInBytes == 0) { + logFetchBuffer.poll(); + } + throw e; + } + } else { + logFetchBuffer.setNextInLineFetch(completedFetch); + } + + logFetchBuffer.poll(); + } else { + List records = fetchRecords(nextInLineFetch, recordsRemaining); + if (!records.isEmpty()) { + TableBucket tableBucket = nextInLineFetch.tableBucket; + List currentRecords = fetched.get(tableBucket); + if (currentRecords == null) { + fetched.put(tableBucket, records); + } else { + // this case shouldn't usually happen because we only send one fetch + // at a time per bucket, but it might conceivably happen in some rare + // cases (such as bucket leader changes). we have to copy to a new list + // because the old one may be immutable + List mergedRecords = + new ArrayList<>(records.size() + currentRecords.size()); + mergedRecords.addAll(currentRecords); + mergedRecords.addAll(records); + fetched.put(tableBucket, mergedRecords); + } + + recordsRemaining -= recordCount(records); + } + } + } + } catch (FetchException e) { + if (fetched.isEmpty()) { + throw e; + } + } + + return toResult(fetched); + } + + /** Initialize a {@link CompletedFetch} object. */ + @Nullable + private CompletedFetch initialize(CompletedFetch completedFetch) { + TableBucket tb = completedFetch.tableBucket; + ApiError error = completedFetch.error; + + try { + if (error.isSuccess()) { + return handleInitializeSuccess(completedFetch); + } else { + handleInitializeErrors(completedFetch, error.error(), error.messageWithFallback()); + return null; + } + } finally { + if (error.isFailure()) { + // we move the bucket to the end if there was an error. This way, + // it's more likely that buckets for the same table can remain together + // (allowing for more efficient serialization). + logScannerStatus.moveBucketToEnd(tb); + } + } + } + + private @Nullable CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) { + TableBucket tb = completedFetch.tableBucket; + long fetchOffset = completedFetch.nextFetchOffset(); + + // we are interested in this fetch only if the beginning offset matches the + // current consumed position. + Long offset = logScannerStatus.getBucketOffset(tb); + if (offset == null) { + log.debug( + "Discarding stale fetch response for bucket {} since the expected offset is null which means the bucket has been unsubscribed.", + tb); + return null; + } + if (offset != fetchOffset) { + log.warn( + "Discarding stale fetch response for bucket {} since its offset {} does not match the expected offset {}.", + tb, + fetchOffset, + offset); + return null; + } + + long highWatermark = completedFetch.highWatermark; + if (highWatermark >= 0) { + log.trace("Updating high watermark for bucket {} to {}.", tb, highWatermark); + logScannerStatus.updateHighWatermark(tb, highWatermark); + } + + completedFetch.setInitialized(); + return completedFetch; + } + + private void handleInitializeErrors( + CompletedFetch completedFetch, Errors error, String errorMessage) { + TableBucket tb = completedFetch.tableBucket; + long fetchOffset = completedFetch.nextFetchOffset(); + if (error == Errors.NOT_LEADER_OR_FOLLOWER + || error == Errors.LOG_STORAGE_EXCEPTION + || error == Errors.KV_STORAGE_EXCEPTION + || error == Errors.STORAGE_EXCEPTION + || error == Errors.FENCED_LEADER_EPOCH_EXCEPTION) { + log.debug( + "Error in fetch for bucket {}: {}:{}", + tb, + error.exceptionName(), + error.exception(errorMessage)); + metadataUpdater.checkAndUpdateMetadata(tablePath, tb); + } else if (error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION) { + log.warn("Received unknown table or bucket error in fetch for bucket {}", tb); + metadataUpdater.checkAndUpdateMetadata(tablePath, tb); + } else if (error == Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION) { + throw new FetchException( + String.format( + "The fetching offset %s is out of range: %s", + fetchOffset, error.exception(errorMessage))); + } else if (error == Errors.AUTHORIZATION_EXCEPTION) { + throw new AuthorizationException(errorMessage); + } else if (error == Errors.UNKNOWN_SERVER_ERROR) { + log.warn( + "Unknown server error while fetching offset {} for bucket {}: {}", + fetchOffset, + tb, + error.exception(errorMessage)); + } else if (error == Errors.CORRUPT_MESSAGE) { + throw new FetchException( + String.format( + "Encountered corrupt message when fetching offset %s for bucket %s: %s", + fetchOffset, tb, error.exception(errorMessage))); + } else { + throw new FetchException( + String.format( + "Unexpected error code %s while fetching at offset %s from bucket %s: %s", + error, fetchOffset, tb, error.exception(errorMessage))); + } + } + + protected abstract List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords); + + protected abstract int recordCount(List fetchedRecords); + + protected abstract R toResult(Map> fetchedRecords); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java new file mode 100644 index 0000000000..c16aa944bc --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ArrowBatchData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Collects Arrow batches from completed fetches. */ +@ThreadSafe +@Internal +public class ArrowLogFetchCollector + extends AbstractLogFetchCollector { + private static final Logger LOG = LoggerFactory.getLogger(ArrowLogFetchCollector.class); + + public ArrowLogFetchCollector( + TablePath tablePath, + LogScannerStatus logScannerStatus, + Configuration conf, + MetadataUpdater metadataUpdater) { + super(LOG, tablePath, logScannerStatus, conf, metadataUpdater); + } + + @Override + protected List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { + TableBucket tb = nextInLineFetch.tableBucket; + Long offset = logScannerStatus.getBucketOffset(tb); + if (offset == null) { + LOG.debug( + "Ignoring fetched records for {} at offset {} since the current offset is null which means the bucket has been unsubscribed.", + tb, + nextInLineFetch.nextFetchOffset()); + } else { + if (nextInLineFetch.nextFetchOffset() == offset) { + List batches = nextInLineFetch.fetchArrowBatches(maxRecords); + LOG.trace( + "Returning {} fetched arrow batches at offset {} for assigned bucket {}.", + batches.size(), + offset, + tb); + + if (nextInLineFetch.nextFetchOffset() > offset) { + LOG.trace( + "Updating fetch offset from {} to {} for bucket {} and returning {} arrow batches from poll()", + offset, + nextInLineFetch.nextFetchOffset(), + tb, + batches.size()); + logScannerStatus.updateOffset(tb, nextInLineFetch.nextFetchOffset()); + } + return batches; + } else { + // these records aren't next in line based on the last consumed offset, ignore them + // they must be from an obsolete request + LOG.warn( + "Ignoring fetched records for {} at offset {} since the current offset is {}", + nextInLineFetch.tableBucket, + nextInLineFetch.nextFetchOffset(), + offset); + } + } + + LOG.trace("Draining fetched records for bucket {}", nextInLineFetch.tableBucket); + nextInLineFetch.drain(); + return Collections.emptyList(); + } + + @Override + protected int recordCount(List fetchedRecords) { + int count = 0; + for (ArrowBatchData fetchedRecord : fetchedRecords) { + count += fetchedRecord.getRecordCount(); + } + return count; + } + + @Override + protected ArrowScanRecords toResult(Map> fetchedRecords) { + return new ArrowScanRecords(fetchedRecords); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java new file mode 100644 index 0000000000..ad289ced29 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ArrowBatchData; +import org.apache.fluss.utils.AbstractIterator; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** A container that holds the scanned Arrow batches per bucket for a particular table. */ +@Internal +public class ArrowScanRecords implements Iterable { + public static final ArrowScanRecords EMPTY = new ArrowScanRecords(Collections.emptyMap()); + + private final Map> records; + + public ArrowScanRecords(Map> records) { + this.records = records; + } + + /** Get just the Arrow batches for the given bucket. */ + public List records(TableBucket scanBucket) { + List recs = records.get(scanBucket); + if (recs == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(recs); + } + + /** Returns the buckets that contain Arrow batches. */ + public Set buckets() { + return Collections.unmodifiableSet(records.keySet()); + } + + /** Returns the total number of rows in all batches. */ + public int count() { + int count = 0; + for (List recs : records.values()) { + for (ArrowBatchData rec : recs) { + count += rec.getRecordCount(); + } + } + return count; + } + + public boolean isEmpty() { + return records.isEmpty(); + } + + @Override + public Iterator iterator() { + return new ConcatenatedIterable(records.values()).iterator(); + } + + private static class ConcatenatedIterable implements Iterable { + + private final Iterable> iterables; + + private ConcatenatedIterable(Iterable> iterables) { + this.iterables = iterables; + } + + @Override + public Iterator iterator() { + return new AbstractIterator() { + final Iterator> iters = iterables.iterator(); + Iterator current; + + public ArrowBatchData makeNext() { + while (current == null || !current.hasNext()) { + if (iters.hasNext()) { + current = iters.next().iterator(); + } else { + return allDone(); + } + } + return current.next(); + } + }; + } + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java index e080ff6692..76daf776a1 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java @@ -22,6 +22,7 @@ import org.apache.fluss.exception.CorruptRecordException; import org.apache.fluss.exception.FetchException; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ArrowBatchData; import org.apache.fluss.record.CompactedLogRecord; import org.apache.fluss.record.IndexedLogRecord; import org.apache.fluss.record.LogRecord; @@ -48,7 +49,7 @@ * contains logic to maintain state between calls to {@link #fetchRecords(int)}. */ @Internal -abstract class CompletedFetch { +public abstract class CompletedFetch { static final Logger LOG = LoggerFactory.getLogger(CompletedFetch.class); final TableBucket tableBucket; @@ -224,29 +225,71 @@ public List fetchRecords(int maxRecords) { return scanRecords; } + /** + * The {@link LogRecordBatch batches} are loaded as {@link ArrowBatchData Arrow batches} and + * returned. + * + * @param maxRecords The maximum number of records to return; the actual number returned may be + * less + * @return {@link ArrowBatchData Arrow batches} + */ + public List fetchArrowBatches(int maxRecords) { + if (isConsumed) { + return Collections.emptyList(); + } + + List arrowBatches = new ArrayList<>(); + int recordsFetched = 0; + try { + while (recordsFetched < maxRecords || arrowBatches.isEmpty()) { + LogRecordBatch batch = nextFetchedBatch(); + if (batch == null) { + break; + } + + ArrowBatchData arrowBatchData = batch.loadArrowBatch(readContext); + if (arrowBatchData.getRecordCount() == 0) { + arrowBatchData.close(); + nextFetchOffset = batch.nextLogOffset(); + continue; + } + + arrowBatches.add(arrowBatchData); + recordsRead += arrowBatchData.getRecordCount(); + recordsFetched += arrowBatchData.getRecordCount(); + nextFetchOffset = batch.nextLogOffset(); + } + } catch (Exception e) { + closeArrowBatches(arrowBatches); + throw new FetchException( + "Received exception when fetching the next Arrow batch from " + + tableBucket + + ". If needed, please back past the batch to continue scanning.", + e); + } + + return arrowBatches; + } + + private void closeArrowBatches(List arrowBatches) { + for (ArrowBatchData arrowBatch : arrowBatches) { + try { + arrowBatch.close(); + } catch (Exception e) { + LOG.warn("Failed to close Arrow batch for bucket {}", tableBucket, e); + } + } + } + private LogRecord nextFetchedRecord() throws Exception { while (true) { if (records == null || !records.hasNext()) { - maybeCloseRecordStream(); - - if (!batches.hasNext()) { - // In batch, we preserve the last offset in a batch. By using the next offset - // computed from the last offset in the batch, we ensure that the offset of the - // next fetch will point to the next batch, which avoids unnecessary re-fetching - // of the same batch (in the worst case, the scanner could get stuck fetching - // the same batch repeatedly). - if (currentBatch != null) { - nextFetchOffset = currentBatch.nextLogOffset(); - } - drain(); + LogRecordBatch batch = nextFetchedBatch(); + if (batch == null) { return null; } - currentBatch = batches.next(); - // TODO get last epoch. - maybeEnsureValid(currentBatch); - - records = currentBatch.records(readContext); + records = batch.records(readContext); } else { LogRecord record = records.next(); // skip any records out of range. @@ -257,6 +300,31 @@ private LogRecord nextFetchedRecord() throws Exception { } } + private LogRecordBatch nextFetchedBatch() { + maybeCloseRecordStream(); + if (!batches.hasNext()) { + finishFetchedBatches(); + return null; + } + + currentBatch = batches.next(); + // TODO get last epoch. + maybeEnsureValid(currentBatch); + return currentBatch; + } + + private void finishFetchedBatches() { + // In batch, we preserve the last offset in a batch. By using the next offset + // computed from the last offset in the batch, we ensure that the offset of the + // next fetch will point to the next batch, which avoids unnecessary re-fetching + // of the same batch (in the worst case, the scanner could get stuck fetching + // the same batch repeatedly). + if (currentBatch != null) { + nextFetchOffset = currentBatch.nextLogOffset(); + } + drain(); + } + private void maybeEnsureValid(LogRecordBatch batch) { if (isCheckCrcs) { if (readContext.isProjectionPushDowned()) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index e690eed8a1..3e04cdd11a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -20,27 +20,18 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.table.scanner.ScanRecord; -import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.AuthorizationException; -import org.apache.fluss.exception.FetchException; -import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.LogRecord; import org.apache.fluss.record.LogRecordBatch; -import org.apache.fluss.rpc.protocol.ApiError; -import org.apache.fluss.rpc.protocol.Errors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,104 +46,24 @@ */ @ThreadSafe @Internal -public class LogFetchCollector { +public class LogFetchCollector extends AbstractLogFetchCollector { private static final Logger LOG = LoggerFactory.getLogger(LogFetchCollector.class); - private final TablePath tablePath; - private final LogScannerStatus logScannerStatus; - private final int maxPollRecords; - private final MetadataUpdater metadataUpdater; - public LogFetchCollector( TablePath tablePath, LogScannerStatus logScannerStatus, Configuration conf, MetadataUpdater metadataUpdater) { - this.tablePath = tablePath; - this.logScannerStatus = logScannerStatus; - this.maxPollRecords = conf.getInt(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS); - this.metadataUpdater = metadataUpdater; + super(LOG, tablePath, logScannerStatus, conf, metadataUpdater); } - /** - * Return the fetched log records, empty the record buffer and update the consumed position. - * - *

NOTE: returning empty records guarantees the consumed position are NOT updated. - * - * @return The fetched records per partition - * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and - * the defaultResetPolicy is NONE - */ - public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { - Map> fetched = new HashMap<>(); - int recordsRemaining = maxPollRecords; - - try { - while (recordsRemaining > 0) { - CompletedFetch nextInLineFetch = logFetchBuffer.nextInLineFetch(); - if (nextInLineFetch == null || nextInLineFetch.isConsumed()) { - CompletedFetch completedFetch = logFetchBuffer.peek(); - if (completedFetch == null) { - break; - } - - if (!completedFetch.isInitialized()) { - try { - logFetchBuffer.setNextInLineFetch(initialize(completedFetch)); - } catch (Exception e) { - // Remove a completedFetch upon a parse with exception if - // (1) it contains no records, and - // (2) there are no fetched records with actual content preceding this - // exception. - if (fetched.isEmpty() && completedFetch.sizeInBytes == 0) { - logFetchBuffer.poll(); - } - throw e; - } - } else { - logFetchBuffer.setNextInLineFetch(completedFetch); - } - - logFetchBuffer.poll(); - } else { - List records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TableBucket tableBucket = nextInLineFetch.tableBucket; - List currentRecords = fetched.get(tableBucket); - if (currentRecords == null) { - fetched.put(tableBucket, records); - } else { - // this case shouldn't usually happen because we only send one fetch at - // a time per bucket, but it might conceivably happen in some rare - // cases (such as bucket leader changes). we have to copy to a new list - // because the old one may be immutable - List newScanRecords = - new ArrayList<>(records.size() + currentRecords.size()); - newScanRecords.addAll(currentRecords); - newScanRecords.addAll(records); - fetched.put(tableBucket, newScanRecords); - } - - recordsRemaining -= records.size(); - } - } - } - } catch (FetchException e) { - if (fetched.isEmpty()) { - throw e; - } - } - - return new ScanRecords(fetched); - } - - private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { + @Override + protected List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { TableBucket tb = nextInLineFetch.tableBucket; Long offset = logScannerStatus.getBucketOffset(tb); if (offset == null) { LOG.debug( - "Ignoring fetched records for {} at offset {} since the current offset is null which means the " - + "bucket has been unsubscribe.", + "Ignoring fetched records for {} at offset {} since the current offset is null which means the bucket has been unsubscribed.", tb, nextInLineFetch.nextFetchOffset()); } else { @@ -191,102 +102,13 @@ private List fetchRecords(CompletedFetch nextInLineFetch, int maxRec return Collections.emptyList(); } - /** Initialize a {@link CompletedFetch} object. */ - private @Nullable CompletedFetch initialize(CompletedFetch completedFetch) { - TableBucket tb = completedFetch.tableBucket; - ApiError error = completedFetch.error; - - try { - if (error.isSuccess()) { - return handleInitializeSuccess(completedFetch); - } else { - handleInitializeErrors(completedFetch, error.error(), error.messageWithFallback()); - return null; - } - } finally { - if (error.isFailure()) { - // we move the bucket to the end if there was an error. This way, - // it's more likely that buckets for the same table can remain together - // (allowing for more efficient serialization). - logScannerStatus.moveBucketToEnd(tb); - } - } - } - - private @Nullable CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) { - TableBucket tb = completedFetch.tableBucket; - long fetchOffset = completedFetch.nextFetchOffset(); - - // we are interested in this fetch only if the beginning offset matches the - // current consumed position. - Long offset = logScannerStatus.getBucketOffset(tb); - if (offset == null) { - LOG.debug( - "Discarding stale fetch response for bucket {} since the expected offset is null which means the bucket has been " - + "unsubscribed.", - tb); - return null; - } - if (offset != fetchOffset) { - LOG.warn( - "Discarding stale fetch response for bucket {} since its offset {} does not match the expected offset {}.", - tb, - fetchOffset, - offset); - return null; - } - - long highWatermark = completedFetch.highWatermark; - if (highWatermark >= 0) { - LOG.trace("Updating high watermark for bucket {} to {}.", tb, highWatermark); - logScannerStatus.updateHighWatermark(tb, highWatermark); - } - - completedFetch.setInitialized(); - return completedFetch; + @Override + protected int recordCount(List fetchedRecords) { + return fetchedRecords.size(); } - private void handleInitializeErrors( - CompletedFetch completedFetch, Errors error, String errorMessage) { - TableBucket tb = completedFetch.tableBucket; - long fetchOffset = completedFetch.nextFetchOffset(); - if (error == Errors.NOT_LEADER_OR_FOLLOWER - || error == Errors.LOG_STORAGE_EXCEPTION - || error == Errors.KV_STORAGE_EXCEPTION - || error == Errors.STORAGE_EXCEPTION - || error == Errors.FENCED_LEADER_EPOCH_EXCEPTION) { - LOG.debug( - "Error in fetch for bucket {}: {}:{}", - tb, - error.exceptionName(), - error.exception(errorMessage)); - metadataUpdater.checkAndUpdateMetadata(tablePath, tb); - } else if (error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION) { - LOG.warn("Received unknown table or bucket error in fetch for bucket {}", tb); - metadataUpdater.checkAndUpdateMetadata(tablePath, tb); - } else if (error == Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION) { - throw new FetchException( - String.format( - "The fetching offset %s is out of range: %s", - fetchOffset, error.exception(errorMessage))); - } else if (error == Errors.AUTHORIZATION_EXCEPTION) { - throw new AuthorizationException(errorMessage); - } else if (error == Errors.UNKNOWN_SERVER_ERROR) { - LOG.warn( - "Unknown server error while fetching offset {} for bucket {}: {}", - fetchOffset, - tb, - error.exception(errorMessage)); - } else if (error == Errors.CORRUPT_MESSAGE) { - throw new FetchException( - String.format( - "Encountered corrupt message when fetching offset %s for bucket %s: %s", - fetchOffset, tb, error.exception(errorMessage))); - } else { - throw new FetchException( - String.format( - "Unexpected error code %s while fetching at offset %s from bucket %s: %s", - error, fetchOffset, tb, error.exception(errorMessage))); - } + @Override + protected ScanRecords toResult(Map> fetchedRecords) { + return new ScanRecords(fetchedRecords); } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index c8f87984b0..f7dd7e9206 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -100,6 +100,7 @@ public class LogFetcher implements Closeable { private final LogScannerStatus logScannerStatus; private final LogFetchBuffer logFetchBuffer; private final LogFetchCollector logFetchCollector; + private final ArrowLogFetchCollector arrowLogFetchCollector; private final RemoteLogDownloader remoteLogDownloader; @GuardedBy("this") @@ -145,6 +146,8 @@ public LogFetcher( this.metadataUpdater = metadataUpdater; this.logFetchCollector = new LogFetchCollector(tablePath, logScannerStatus, conf, metadataUpdater); + this.arrowLogFetchCollector = + new ArrowLogFetchCollector(tablePath, logScannerStatus, conf, metadataUpdater); this.scannerMetricGroup = scannerMetricGroup; this.remoteLogDownloader = new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup); @@ -164,6 +167,10 @@ public ScanRecords collectFetch() { return logFetchCollector.collectFetch(logFetchBuffer); } + public ArrowScanRecords collectArrowFetch() { + return arrowLogFetchCollector.collectFetch(logFetchBuffer); + } + /** * Set up a fetch request for any node that we have assigned buckets for which doesn't already * have an in-flight fetch or pending fetch data. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 9c24dbeb77..7c9091043b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -17,12 +17,15 @@ package org.apache.fluss.client.table.scanner.log; +import org.apache.fluss.annotation.Internal; import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; +import org.apache.fluss.client.table.scanner.Scan; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.WakeupException; +import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; @@ -42,6 +45,8 @@ import java.util.ConcurrentModificationException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.function.Supplier; /** * The default impl of {@link LogScanner}. @@ -55,24 +60,27 @@ @PublicEvolving public class LogScannerImpl implements LogScanner { private static final Logger LOG = LoggerFactory.getLogger(LogScannerImpl.class); - private static final long NO_CURRENT_THREAD = -1L; + private final TablePath tablePath; private final LogScannerStatus logScannerStatus; private final MetadataUpdater metadataUpdater; private final LogFetcher logFetcher; private final long tableId; private final boolean isPartitionedTable; - - private volatile boolean closed = false; + private final boolean arrowLogFormat; + private final boolean isLogTable; + private final boolean hasProjection; + // metrics + private final ScannerMetricGroup scannerMetricGroup; // currentThread holds the threadId of the current thread accessing FlussLogScanner // and is used to prevent multithreaded access private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); // refCount is used to allow reentrant access by the thread who has acquired currentThread. private final AtomicInteger refCount = new AtomicInteger(0); - // metrics - private final ScannerMetricGroup scannerMetricGroup; + + private volatile boolean closed = false; public LogScannerImpl( Configuration conf, @@ -85,6 +93,9 @@ public LogScannerImpl( this.tablePath = tableInfo.getTablePath(); this.tableId = tableInfo.getTableId(); this.isPartitionedTable = tableInfo.isPartitioned(); + this.arrowLogFormat = tableInfo.getTableConfig().getLogFormat() == LogFormat.ARROW; + this.isLogTable = !tableInfo.hasPrimaryKey(); + this.hasProjection = projectedFields != null; // add this table to metadata updater. metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); this.logScannerStatus = new LogScannerStatus(); @@ -128,43 +139,34 @@ private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo t @Override public ScanRecords poll(Duration timeout) { - acquireAndEnsureOpen(); - try { - if (!logScannerStatus.prepareToPoll()) { - throw new IllegalStateException("LogScanner is not subscribed any buckets."); - } - - scannerMetricGroup.recordPollStart(System.currentTimeMillis()); - long timeoutNanos = timeout.toNanos(); - long startNanos = System.nanoTime(); - do { - ScanRecords scanRecords = pollForFetches(); - if (scanRecords.isEmpty()) { - try { - if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { - // logFetcher waits for the timeout and no data in buffer, - // so we return empty - return scanRecords; - } - } catch (WakeupException e) { - // wakeup() is called, we need to return empty - return scanRecords; - } - } else { - // before returning the fetched records, we can send off the next round of - // fetches and avoid block waiting for their responses to enable pipelining - // while the user is handling the fetched records. - logFetcher.sendFetches(); - - return scanRecords; - } - } while (System.nanoTime() - startNanos < timeoutNanos); + return doPoll(timeout, this::pollForFetches, ScanRecords::isEmpty, () -> ScanRecords.EMPTY); + } - return ScanRecords.EMPTY; - } finally { - release(); - scannerMetricGroup.recordPollEnd(System.currentTimeMillis()); + /** + * Polls Arrow record batches for internal callers. + * + *

This method is intentionally kept off the public {@link Scan} API surface for now. + */ + @Internal + public ArrowScanRecords pollRecordBatch(Duration timeout) { + if (!arrowLogFormat) { + throw new UnsupportedOperationException( + "Arrow record batch polling is only supported for tables whose log format is ARROW."); + } + if (!isLogTable) { + throw new UnsupportedOperationException( + "Arrow record batch polling is only supported for log tables. CDC scanning is not supported."); } + if (hasProjection) { + throw new UnsupportedOperationException( + "Arrow record batch polling does not support projection. Please create the scanner without projection."); + } + + return doPoll( + timeout, + this::pollForRecordBatches, + ArrowScanRecords::isEmpty, + () -> ArrowScanRecords.EMPTY); } @Override @@ -178,8 +180,8 @@ public void subscribe(int bucket, long offset) { acquireAndEnsureOpen(); try { TableBucket tableBucket = new TableBucket(tableId, bucket); - this.metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); - this.logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset)); + metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); + logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset)); } finally { release(); } @@ -199,9 +201,9 @@ public void subscribe(long partitionId, int bucket, long offset) { // we make assumption that the partition id must belong to the current table // if we can't find the partition id from the table path, we'll consider the table // is not exist - this.metadataUpdater.checkAndUpdatePartitionMetadata( + metadataUpdater.checkAndUpdatePartitionMetadata( tablePath, Collections.singleton(partitionId)); - this.logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset)); + logScannerStatus.assignScanBuckets(Collections.singletonMap(tableBucket, offset)); } finally { release(); } @@ -216,7 +218,7 @@ public void unsubscribe(long partitionId, int bucket) { acquireAndEnsureOpen(); try { TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket); - this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket)); + logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket)); } finally { release(); } @@ -233,7 +235,7 @@ public void unsubscribe(int bucket) { acquireAndEnsureOpen(); try { TableBucket tableBucket = new TableBucket(tableId, bucket); - this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket)); + logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket)); } finally { release(); } @@ -252,12 +254,66 @@ private ScanRecords pollForFetches() { // send any new fetches (won't resend pending fetches). logFetcher.sendFetches(); - return logFetcher.collectFetch(); } + private ArrowScanRecords pollForRecordBatches() { + ArrowScanRecords scanRecords = logFetcher.collectArrowFetch(); + if (!scanRecords.isEmpty()) { + return scanRecords; + } + + // send any new fetches (won't resend pending fetches). + logFetcher.sendFetches(); + return logFetcher.collectArrowFetch(); + } + + /** Shared polling loop for row and Arrow scan results. */ + private T doPoll( + Duration timeout, + Supplier pollForFetches, + Predicate isEmpty, + Supplier emptyResult) { + acquireAndEnsureOpen(); + try { + if (!logScannerStatus.prepareToPoll()) { + throw new IllegalStateException("LogScanner is not subscribed any buckets."); + } + + scannerMetricGroup.recordPollStart(System.currentTimeMillis()); + long timeoutNanos = timeout.toNanos(); + long startNanos = System.nanoTime(); + do { + T scanRecords = pollForFetches.get(); + if (isEmpty.test(scanRecords)) { + try { + if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { + // logFetcher waits for the timeout and no data in buffer, + // so we return empty + return scanRecords; + } + } catch (WakeupException e) { + // wakeup() is called, we need to return empty + return scanRecords; + } + } else { + // before returning the fetched records, we can send off the next round of + // fetches and avoid block waiting for their responses to enable pipelining + // while the user is handling the fetched records. + logFetcher.sendFetches(); + return scanRecords; + } + } while (System.nanoTime() - startNanos < timeoutNanos); + + return emptyResult.get(); + } finally { + release(); + scannerMetricGroup.recordPollEnd(System.currentTimeMillis()); + } + } + /** - * Acquire the light lock and ensure that the consumer hasn't been closed. + * Acquire the light lock and ensure that the scanner hasn't been closed. * * @throws IllegalStateException If the scanner has been closed */ @@ -277,8 +333,8 @@ private void acquireAndEnsureOpen() { * @throws ConcurrentModificationException if another thread already has the lock */ private void acquire() { - final Thread thread = Thread.currentThread(); - final long threadId = thread.getId(); + Thread thread = Thread.currentThread(); + long threadId = thread.getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) { throw new ConcurrentModificationException( @@ -295,7 +351,7 @@ private void acquire() { refCount.incrementAndGet(); } - /** Release the light lock protecting the consumer from multithreaded access. */ + /** Release the light lock protecting the scanner from multithreaded access. */ private void release() { if (refCount.decrementAndGet() == 0) { currentThread.set(NO_CURRENT_THREAD); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java index cfe215fb0b..f760b66264 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java @@ -23,21 +23,27 @@ import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.exception.FetchException; +import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ArrowBatchData; import org.apache.fluss.record.ChangeType; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -86,6 +92,85 @@ void testPoll() throws Exception { } } + @Test + void testPollArrowBatchesWithSchemaEvolution() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_arrow_batches_with_schema_evolution"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + .logFormat(LogFormat.ARROW) + .build(); + createTable(tablePath, tableDescriptor, false); + + // write 3 rows with the original schema (a: INT, b: STRING) + try (Table table = conn.getTable(tablePath)) { + AppendWriter appendWriter = table.newAppend().createWriter(); + for (int i = 0; i < 3; i++) { + appendWriter.append(row(i, "value-" + i)); + } + appendWriter.flush(); + } + + // add column c: STRING + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "c", + DataTypes.STRING(), + null, + TableChange.ColumnPosition.last())), + false) + .get(); + + // write 3 more rows with the evolved schema (a: INT, b: STRING, c: STRING) + try (Table table = conn.getTable(tablePath)) { + AppendWriter appendWriter = table.newAppend().createWriter(); + for (int i = 3; i < 6; i++) { + appendWriter.append(row(i, "value-" + i, "extra-" + i)); + } + appendWriter.flush(); + + int totalRecords = 6; + try (LogScannerImpl scanner = (LogScannerImpl) table.newScan().createLogScanner()) { + scanner.subscribeFromBeginning(0); + + int count = 0; + long deadline = System.nanoTime() + Duration.ofSeconds(30).toNanos(); + while (count < totalRecords) { + assertThat(System.nanoTime()) + .as("Timed out waiting for %s records, got %s", totalRecords, count) + .isLessThan(deadline); + ArrowScanRecords records = scanner.pollRecordBatch(Duration.ofSeconds(1)); + for (ArrowBatchData batch : records) { + try (ArrowBatchData b = batch) { + IntVector intVector = (IntVector) b.getVectorSchemaRoot().getVector(0); + VarCharVector stringVector = + (VarCharVector) b.getVectorSchemaRoot().getVector(1); + VarCharVector extraVector = + (VarCharVector) b.getVectorSchemaRoot().getVector(2); + for (int rowId = 0; rowId < b.getRecordCount(); rowId++) { + assertThat(intVector.get(rowId)).isEqualTo(count); + assertThat(stringVector.getObject(rowId).toString()) + .isEqualTo("value-" + count); + if (count < 3) { + assertThat(extraVector.isNull(rowId)).isTrue(); + } else { + assertThat(extraVector.getObject(rowId).toString()) + .isEqualTo("extra-" + count); + } + assertThat(b.getLogOffset(rowId)).isEqualTo(count); + count++; + } + } + } + } + assertThat(count).isEqualTo(totalRecords); + } + } + } + @Test void testPollWhileCreateTableNotReady() throws Exception { // create one table with 30 buckets. diff --git a/fluss-common/pom.xml b/fluss-common/pom.xml index b51017e086..771a9ef2ed 100644 --- a/fluss-common/pom.xml +++ b/fluss-common/pom.xml @@ -62,6 +62,20 @@ fluss-shaded-arrow + + org.apache.arrow + arrow-vector + ${arrow.version} + provided + + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + provided + + at.yawk.lz4 diff --git a/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedArrowCompressionFactory.java b/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedArrowCompressionFactory.java new file mode 100644 index 0000000000..0e7da37bbf --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedArrowCompressionFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.compression; + +import org.apache.fluss.annotation.Internal; + +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; + +/** Unshaded Arrow compression factory for scanner/read path. */ +@Internal +public class UnshadedArrowCompressionFactory implements CompressionCodec.Factory { + + public static final UnshadedArrowCompressionFactory INSTANCE = + new UnshadedArrowCompressionFactory(); + + @Override + public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { + switch (codecType) { + case LZ4_FRAME: + return new UnshadedLz4ArrowCompressionCodec(); + case ZSTD: + return new UnshadedZstdArrowCompressionCodec(); + case NO_COMPRESSION: + return NoCompressionCodec.INSTANCE; + default: + throw new IllegalArgumentException("Compression type not supported: " + codecType); + } + } + + @Override + public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int compressionLevel) { + switch (codecType) { + case LZ4_FRAME: + return new UnshadedLz4ArrowCompressionCodec(); + case ZSTD: + return new UnshadedZstdArrowCompressionCodec(compressionLevel); + case NO_COMPRESSION: + return NoCompressionCodec.INSTANCE; + default: + throw new IllegalArgumentException("Compression type not supported: " + codecType); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedLz4ArrowCompressionCodec.java b/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedLz4ArrowCompressionCodec.java new file mode 100644 index 0000000000..bc41aa802d --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedLz4ArrowCompressionCodec.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.compression; + +import org.apache.fluss.utils.IOUtils; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Unshaded Arrow compression codec for the LZ4 algorithm. */ +public class UnshadedLz4ArrowCompressionCodec extends AbstractCompressionCodec { + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + checkArgument( + uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit"); + + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + uncompressedBuffer.getBytes(0, inBytes); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FlussLZ4BlockOutputStream(baos)) { + IOUtils.copyBytes(in, out); + } catch (IOException e) { + throw new RuntimeException(e); + } + + byte[] outBytes = baos.toByteArray(); + ArrowBuf compressedBuffer = + allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + checkArgument( + compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit"); + + long decompressedLength = readUncompressedLength(compressedBuffer); + ByteBuffer inByteBuffer = + compressedBuffer.nioBuffer( + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, + (int) + (compressedBuffer.writerIndex() + - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)); + ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); + try (InputStream in = new FlussLZ4BlockInputStream(inByteBuffer)) { + IOUtils.copyBytes(in, out); + } catch (IOException e) { + throw new RuntimeException(e); + } + + byte[] outBytes = out.toByteArray(); + ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); + decompressedBuffer.setBytes(0, outBytes); + decompressedBuffer.writerIndex(decompressedLength); + return decompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.LZ4_FRAME; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedZstdArrowCompressionCodec.java b/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedZstdArrowCompressionCodec.java new file mode 100644 index 0000000000..803889521b --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/compression/UnshadedZstdArrowCompressionCodec.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.compression; + +import com.github.luben.zstd.Zstd; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; + +import java.nio.ByteBuffer; + +/** Unshaded Arrow compression codec for the Zstd algorithm. */ +public class UnshadedZstdArrowCompressionCodec extends AbstractCompressionCodec { + private static final int DEFAULT_COMPRESSION_LEVEL = 3; + private final int compressionLevel; + + public UnshadedZstdArrowCompressionCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + public UnshadedZstdArrowCompressionCodec(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex()); + ByteBuffer uncompressedDirectBuffer = uncompressedBuffer.nioBuffer(); + + long compressedSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize; + ArrowBuf compressedBuffer = allocator.buffer(compressedSize); + ByteBuffer compressedDirectBuffer = + compressedBuffer.nioBuffer( + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, (int) maxSize); + + long bytesWritten = + Zstd.compressDirectByteBuffer( + compressedDirectBuffer, + 0, + (int) maxSize, + uncompressedDirectBuffer, + 0, + (int) uncompressedBuffer.writerIndex(), + compressionLevel); + + if (Zstd.isError(bytesWritten)) { + compressedBuffer.close(); + throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten)); + } + + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + bytesWritten); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + long decompressedLength = readUncompressedLength(compressedBuffer); + + ByteBuffer compressedDirectBuffer = compressedBuffer.nioBuffer(); + ArrowBuf uncompressedBuffer = allocator.buffer(decompressedLength); + ByteBuffer uncompressedDirectBuffer = + uncompressedBuffer.nioBuffer(0, (int) decompressedLength); + + long decompressedSize = + Zstd.decompressDirectByteBuffer( + uncompressedDirectBuffer, + 0, + (int) decompressedLength, + compressedDirectBuffer, + (int) CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, + (int) + (compressedBuffer.writerIndex() + - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)); + if (Zstd.isError(decompressedSize)) { + uncompressedBuffer.close(); + throw new RuntimeException( + "Error decompressing: " + Zstd.getErrorName(decompressedSize)); + } + + if (decompressedLength != decompressedSize) { + uncompressedBuffer.close(); + throw new RuntimeException( + "Expected != actual decompressed length: " + + decompressedLength + + " != " + + decompressedSize); + } + uncompressedBuffer.writerIndex(decompressedLength); + return uncompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.ZSTD; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java b/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java new file mode 100644 index 0000000000..55806cea38 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.annotation.Internal; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Holds a scanned Arrow batch together with the log metadata of the batch. + * + *

This class only supports append-only log tables. CDC tables are not supported. + * + *

The caller must close this object after use in order to release the underlying Arrow memory. + */ +@Internal +public class ArrowBatchData implements AutoCloseable { + + private final VectorSchemaRoot vectorSchemaRoot; + private final BufferAllocator allocator; + private final long baseLogOffset; + private final long timestamp; + private final int schemaId; + + public ArrowBatchData( + VectorSchemaRoot vectorSchemaRoot, + BufferAllocator allocator, + long baseLogOffset, + long timestamp, + int schemaId) { + this.vectorSchemaRoot = checkNotNull(vectorSchemaRoot, "vectorSchemaRoot must not be null"); + this.allocator = checkNotNull(allocator, "allocator must not be null"); + this.baseLogOffset = baseLogOffset; + this.timestamp = timestamp; + this.schemaId = schemaId; + } + + /** Returns the Arrow vectors of this batch. */ + public VectorSchemaRoot getVectorSchemaRoot() { + return vectorSchemaRoot; + } + + /** Returns the schema id of this batch. */ + public int getSchemaId() { + return schemaId; + } + + /** Returns the base log offset of this batch. */ + public long getBaseLogOffset() { + return baseLogOffset; + } + + /** Returns the commit timestamp of this batch. */ + public long getTimestamp() { + return timestamp; + } + + /** Returns the number of rows in this batch. */ + public int getRecordCount() { + return vectorSchemaRoot.getRowCount(); + } + + /** Returns the log offset of the given row. */ + public long getLogOffset(int rowId) { + validateRowId(rowId); + return baseLogOffset + rowId; + } + + private void validateRowId(int rowId) { + checkArgument( + rowId >= 0 && rowId < getRecordCount(), + "rowId must be in [0, %s), but is %s", + getRecordCount(), + rowId); + } + + @Override + public void close() { + try { + vectorSchemaRoot.close(); + } finally { + allocator.close(); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java b/fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java new file mode 100644 index 0000000000..64cf19ba42 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/ArrowRecordBatchContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.memory.MemorySegment; + +/** Internal context for loading Arrow record batches with unshaded Arrow resources. */ +@Internal +interface ArrowRecordBatchContext extends LogRecordBatch.ReadContext { + + /** Creates a batch-scoped access wrapper for unshaded Arrow resources. */ + UnshadedArrowBatchAccess createUnshadedArrowBatchAccess(int schemaId); + + /** Internal wrapper that hides unshaded Arrow types from shared signatures. */ + @Internal + interface UnshadedArrowBatchAccess extends AutoCloseable { + + /** Loads one Arrow batch from the given memory segment into the internal read root. */ + void loadArrowBatch(MemorySegment segment, int arrowOffset, int arrowLength); + + /** + * Applies schema-evolution projection (if needed) internally and builds the final {@link + * ArrowBatchData}, transferring ownership to the caller. + */ + ArrowBatchData createArrowBatchData(long baseLogOffset, long timestamp, int schemaId); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index 4d9c7fd040..692cf0c9ac 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -31,6 +31,7 @@ import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.MurmurHashUtils; import org.apache.fluss.utils.crc.Crc32C; @@ -62,6 +63,7 @@ import static org.apache.fluss.record.LogRecordBatchFormat.statisticsDataOffset; import static org.apache.fluss.record.LogRecordBatchFormat.statisticsLengthOffset; import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset; +import static org.apache.fluss.utils.Preconditions.checkArgument; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -262,6 +264,38 @@ public CloseableIterator records(ReadContext context) { } } + @Override + public ArrowBatchData loadArrowBatch(ReadContext context) { + if (context.getLogFormat() != LogFormat.ARROW) { + throw new UnsupportedOperationException( + "loadArrowBatch is only supported for ARROW log format."); + } + + int schemaId = schemaId(); + checkArgument( + context instanceof ArrowRecordBatchContext, + "Arrow batch loading requires context to implement ArrowRecordBatchContext, but is %s.", + context.getClass().getName()); + ArrowRecordBatchContext arrowRecordBatchContext = (ArrowRecordBatchContext) context; + checkArgument(isAppendOnly(), "Arrow batch loading only supports append-only batches."); + ArrowRecordBatchContext.UnshadedArrowBatchAccess batchAccess = + arrowRecordBatchContext.createUnshadedArrowBatchAccess(schemaId); + + try { + int recordsDataOffset = recordsDataOffset(); + int arrowOffset = position + recordsDataOffset; + int arrowLength = sizeInBytes() - recordsDataOffset; + batchAccess.loadArrowBatch(segment, arrowOffset, arrowLength); + ArrowBatchData arrowBatchData = + batchAccess.createArrowBatchData(baseLogOffset(), commitTimestamp(), schemaId); + batchAccess = null; + return arrowBatchData; + } catch (Throwable t) { + IOUtils.closeQuietly(batchAccess); + throw t; + } + } + @Override public boolean equals(Object o) { if (this == o) { @@ -352,7 +386,7 @@ private CloseableIterator columnRecordIterator( VectorSchemaRoot root, BufferAllocator allocator, long timestamp) { - boolean isAppendOnly = (attributes() & APPEND_ONLY_FLAG_MASK) > 0; + boolean isAppendOnly = isAppendOnly(); int recordsDataOffset = recordsDataOffset(); if (isAppendOnly) { // append only batch, no change type vector, @@ -388,6 +422,10 @@ protected ChangeType getChangeType(int rowId) { } } + private boolean isAppendOnly() { + return (attributes() & APPEND_ONLY_FLAG_MASK) > 0; + } + /** The basic implementation for Arrow log record iterator. */ private abstract class ArrowLogRecordIterator extends LogRecordIterator { private final ArrowReader reader; diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java index fca1a2c49c..9e15a508ef 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java @@ -179,6 +179,11 @@ public CloseableIterator records(ReadContext context) { return loadFullBatch().records(context); } + @Override + public ArrowBatchData loadArrowBatch(ReadContext context) { + return loadFullBatch().loadArrowBatch(context); + } + @Override public boolean isValid() { return loadFullBatch().isValid(); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java index d1cec1c9e4..9f2046c484 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java @@ -173,6 +173,16 @@ default boolean hasWriterId() { */ CloseableIterator records(ReadContext context); + /** + * Loads the underlying Arrow batch directly. + * + *

This method is only supported for Arrow log batches. + */ + default ArrowBatchData loadArrowBatch(ReadContext context) { + throw new UnsupportedOperationException( + "loadArrowBatch is only supported for ARROW log format."); + } + /** The read context of a {@link LogRecordBatch} to read records. */ interface ReadContext { diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 2b0f695ecf..42324edd5f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -18,6 +18,7 @@ package org.apache.fluss.record; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; @@ -31,8 +32,10 @@ import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; +import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.Projection; +import org.apache.fluss.utils.UnshadedArrowReadUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -41,11 +44,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.IntStream; -import static org.apache.fluss.utils.Preconditions.checkNotNull; - /** A simple implementation for {@link LogRecordBatch.ReadContext}. */ @ThreadSafe -public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoCloseable { +public class LogRecordReadContext + implements LogRecordBatch.ReadContext, ArrowRecordBatchContext, AutoCloseable { // the log format of the table private final LogFormat logFormat; @@ -54,14 +56,16 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo // the static schemaId of the table, should support dynamic schema evolution in the future private final int targetSchemaId; // the Arrow memory buffer allocator for the table, should be null if not ARROW log format - @Nullable private final BufferAllocator bufferAllocator; - // the final selected fields of the read data + @Nullable private volatile BufferAllocator bufferAllocator; + @Nullable private volatile AutoCloseable unshadedBufferAllocator; private final FieldGetter[] selectedFieldGetters; // whether the projection is push downed to the server side and the returned data is pruned. private final boolean projectionPushDowned; private final SchemaGetter schemaGetter; private final ConcurrentHashMap vectorSchemaRootMap = MapUtils.newConcurrentHashMap(); + private final Object arrowResourceLock = new Object(); + private final Object unshadedArrowResourceLock = new Object(); public static LogRecordReadContext createReadContext( TableInfo tableInfo, @@ -114,14 +118,13 @@ private static LogRecordReadContext createArrowReadContext( int[] selectedFields, boolean projectionPushDowned, SchemaGetter schemaGetter) { - // TODO: use a more reasonable memory limit - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, dataRowType, schemaId, - allocator, + null, + selectedFields, fieldGetters, projectionPushDowned, schemaGetter); @@ -187,7 +190,14 @@ public static LogRecordReadContext createIndexedReadContext( FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); // for INDEXED log format, the projection is NEVER push downed to the server side return new LogRecordReadContext( - LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, false, schemaGetter); + LogFormat.INDEXED, + rowType, + schemaId, + null, + selectedFields, + fieldGetters, + false, + schemaGetter); } /** @@ -202,7 +212,14 @@ public static LogRecordReadContext createCompactedRowReadContext( FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); // for COMPACTED log format, the projection is NEVER push downed to the server side return new LogRecordReadContext( - LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, false, null); + LogFormat.COMPACTED, + rowType, + schemaId, + null, + selectedFields, + fieldGetters, + false, + null); } private LogRecordReadContext( @@ -210,6 +227,7 @@ private LogRecordReadContext( RowType targetDataRowType, int targetSchemaId, BufferAllocator bufferAllocator, + int[] selectedFields, FieldGetter[] selectedFieldGetters, boolean projectionPushDowned, SchemaGetter schemaGetter) { @@ -247,6 +265,22 @@ public boolean isProjectionPushDowned() { return projectionPushDowned; } + /** + * Returns the column index mapping for schema evolution, or {@code null} if the batch schema + * matches the current schema and no remapping is needed. + * + *

Each entry maps an output column to its position in the batch's schema. A value of {@code + * -1} means the column does not exist in the batch's schema and should be filled with nulls. + */ + @Nullable + private int[] getSchemaEvolutionMapping(int schemaId) { + ProjectedRow projectedRow = getOutputProjectedRow(schemaId); + if (projectedRow == null) { + return null; + } + return projectedRow.getIndexMapping(); + } + @Override public VectorSchemaRoot getVectorSchemaRoot(int schemaId) { if (logFormat != LogFormat.ARROW) { @@ -254,12 +288,19 @@ public VectorSchemaRoot getVectorSchemaRoot(int schemaId) { "Only Arrow log format provides vector schema root."); } - RowType rowType = getRowType(schemaId); - return vectorSchemaRootMap.computeIfAbsent( - schemaId, - (id) -> - VectorSchemaRoot.create( - ArrowUtils.toArrowSchema(rowType), bufferAllocator)); + synchronized (arrowResourceLock) { + VectorSchemaRoot vectorSchemaRoot = vectorSchemaRootMap.get(schemaId); + if (vectorSchemaRoot != null) { + return vectorSchemaRoot; + } + + RowType rowType = getRowType(schemaId); + vectorSchemaRoot = + VectorSchemaRoot.create( + ArrowUtils.toArrowSchema(rowType), getOrCreateBufferAllocator()); + vectorSchemaRootMap.put(schemaId, vectorSchemaRoot); + return vectorSchemaRoot; + } } @Override @@ -267,8 +308,16 @@ public BufferAllocator getBufferAllocator() { if (logFormat != LogFormat.ARROW) { throw new IllegalArgumentException("Only Arrow log format provides buffer allocator."); } - checkNotNull(bufferAllocator, "The buffer allocator is not available."); - return bufferAllocator; + return getOrCreateBufferAllocator(); + } + + @Override + public UnshadedArrowBatchAccess createUnshadedArrowBatchAccess(int schemaId) { + if (logFormat != LogFormat.ARROW) { + throw new IllegalArgumentException( + "Only Arrow log format provides unshaded Arrow resources."); + } + return new UnshadedArrowBatchAccessImpl(schemaId); } @Nullable @@ -284,9 +333,104 @@ public ProjectedRow getOutputProjectedRow(int schemaId) { } public void close() { - vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close); - if (bufferAllocator != null) { - bufferAllocator.close(); + synchronized (arrowResourceLock) { + vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close); + vectorSchemaRootMap.clear(); + if (bufferAllocator != null) { + bufferAllocator.close(); + bufferAllocator = null; + } + } + + synchronized (unshadedArrowResourceLock) { + IOUtils.closeQuietly(unshadedBufferAllocator); + unshadedBufferAllocator = null; + } + } + + private BufferAllocator getOrCreateBufferAllocator() { + BufferAllocator allocator = bufferAllocator; + if (allocator != null) { + return allocator; + } + + synchronized (arrowResourceLock) { + if (bufferAllocator == null) { + bufferAllocator = new RootAllocator(Long.MAX_VALUE); + } + return bufferAllocator; + } + } + + private AutoCloseable getOrCreateUnshadedBufferAllocator() { + AutoCloseable allocator = unshadedBufferAllocator; + if (allocator != null) { + return allocator; + } + + synchronized (unshadedArrowResourceLock) { + if (unshadedBufferAllocator == null) { + unshadedBufferAllocator = new org.apache.arrow.memory.RootAllocator(Long.MAX_VALUE); + } + return unshadedBufferAllocator; + } + } + + private final class UnshadedArrowBatchAccessImpl implements UnshadedArrowBatchAccess { + private final org.apache.arrow.memory.BufferAllocator allocator; + private org.apache.arrow.vector.VectorSchemaRoot readRoot; + private org.apache.arrow.vector.VectorSchemaRoot outputRoot; + private boolean ownershipTransferred; + + private UnshadedArrowBatchAccessImpl(int schemaId) { + org.apache.arrow.memory.BufferAllocator parentAllocator = + (org.apache.arrow.memory.BufferAllocator) getOrCreateUnshadedBufferAllocator(); + this.allocator = + parentAllocator.newChildAllocator("log-record-read-batch", 0, Long.MAX_VALUE); + this.readRoot = + org.apache.arrow.vector.VectorSchemaRoot.create( + org.apache.fluss.utils.UnshadedArrowReadUtils.toArrowSchema( + getRowType(schemaId)), + allocator); + this.outputRoot = readRoot; + } + + @Override + public void loadArrowBatch(MemorySegment segment, int arrowOffset, int arrowLength) { + UnshadedArrowReadUtils.loadArrowBatch( + segment, arrowOffset, arrowLength, readRoot, allocator); + } + + @Override + public ArrowBatchData createArrowBatchData( + long baseLogOffset, long timestamp, int schemaId) { + int[] schemaMapping = getSchemaEvolutionMapping(schemaId); + if (schemaMapping != null) { + outputRoot = + UnshadedArrowReadUtils.projectVectorSchemaRoot( + readRoot, dataRowType, schemaMapping, allocator); + readRoot.close(); + readRoot = null; + } + ArrowBatchData arrowBatchData = + new ArrowBatchData(outputRoot, allocator, baseLogOffset, timestamp, schemaId); + ownershipTransferred = true; + outputRoot = null; + readRoot = null; + return arrowBatchData; + } + + @Override + public void close() { + if (ownershipTransferred) { + return; + } + + IOUtils.closeQuietly(outputRoot); + if (outputRoot != readRoot) { + IOUtils.closeQuietly(readRoot); + } + IOUtils.closeQuietly(allocator); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java b/fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java new file mode 100644 index 0000000000..7798f8ef6e --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.record; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.util.Collections2; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.TypeLayout; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.CompressionUtil.CodecType; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Field; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Unshaded variant of {@link FlussVectorLoader} for scanner/read path. */ +public class UnshadedFlussVectorLoader { + private final VectorSchemaRoot root; + private final CompressionCodec.Factory factory; + private boolean decompressionNeeded; + + public UnshadedFlussVectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) { + this.root = root; + this.factory = factory; + } + + public void load(ArrowRecordBatch recordBatch) { + Iterator buffers = recordBatch.getBuffers().iterator(); + Iterator nodes = recordBatch.getNodes().iterator(); + CompressionUtil.CodecType codecType = + CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec()); + this.decompressionNeeded = codecType != CodecType.NO_COMPRESSION; + CompressionCodec codec = + this.decompressionNeeded + ? this.factory.createCodec(codecType) + : NoCompressionCodec.INSTANCE; + + for (FieldVector fieldVector : this.root.getFieldVectors()) { + this.loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); + } + + this.root.setRowCount(recordBatch.getLength()); + if (nodes.hasNext() || buffers.hasNext()) { + throw new IllegalArgumentException( + "not all nodes and buffers were consumed. nodes: " + + Collections2.toString(nodes) + + " buffers: " + + Collections2.toString(buffers)); + } + } + + private void loadBuffers( + FieldVector vector, + Field field, + Iterator buffers, + Iterator nodes, + CompressionCodec codec) { + Preconditions.checkArgument( + nodes.hasNext(), "no more field nodes for field %s and vector %s", field, vector); + ArrowFieldNode fieldNode = nodes.next(); + int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType()); + List ownBuffers = new ArrayList<>(bufferLayoutCount); + + try { + for (int j = 0; j < bufferLayoutCount; ++j) { + ArrowBuf nextBuf = buffers.next(); + ArrowBuf bufferToAdd = + nextBuf.writerIndex() > 0L + ? codec.decompress(vector.getAllocator(), nextBuf) + : nextBuf; + ownBuffers.add(bufferToAdd); + if (this.decompressionNeeded) { + nextBuf.getReferenceManager().retain(); + } + } + vector.loadFieldBuffers(fieldNode, ownBuffers); + } catch (RuntimeException e) { + throw new IllegalArgumentException( + "Could not load buffers for field " + + field + + ". error message: " + + e.getMessage(), + e); + } finally { + if (this.decompressionNeeded) { + for (ArrowBuf buf : ownBuffers) { + buf.close(); + } + } + } + + List children = field.getChildren(); + if (!children.isEmpty()) { + List childrenFromFields = vector.getChildrenFromFields(); + Preconditions.checkArgument( + children.size() == childrenFromFields.size(), + "should have as many children as in the schema: found %s expected %s", + childrenFromFields.size(), + children.size()); + + for (int i = 0; i < childrenFromFields.size(); ++i) { + Field child = children.get(i); + FieldVector fieldVector = childrenFromFields.get(i); + this.loadBuffers(fieldVector, child, buffers, nodes, codec); + } + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java index 25656204d2..0af0f40bdc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java @@ -19,11 +19,10 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.utils.SchemaUtil; import java.util.Arrays; -import static org.apache.fluss.utils.SchemaUtil.getIndexMapping; - /** * An implementation of {@link InternalRow} which provides a projected view of the underlying {@link * InternalRow}. @@ -190,10 +189,15 @@ public static ProjectedRow from(int[] projection) { } public static ProjectedRow from(Schema originSchema, Schema expectedSchema) { - int[] indexMapping = getIndexMapping(originSchema, expectedSchema); + int[] indexMapping = SchemaUtil.getIndexMapping(originSchema, expectedSchema); return new ProjectedRow(indexMapping); } + /** Returns the index mapping used by this projected row. */ + public int[] getIndexMapping() { + return Arrays.copyOf(indexMapping, indexMapping.length); + } + /** * Returns the underlying row before column projection. * diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java new file mode 100644 index 0000000000..705e4847c8 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.compression.UnshadedArrowCompressionFactory; +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.record.UnshadedFlussVectorLoader; +import org.apache.fluss.types.RowType; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** Utilities for loading and projecting Arrow scan batches with unshaded Arrow classes. */ +@Internal +public final class UnshadedArrowReadUtils { + + private UnshadedArrowReadUtils() {} + + public static Schema toArrowSchema(RowType rowType) { + try { + return Schema.fromJSON(ArrowUtils.toArrowSchema(rowType).toJson()); + } catch (IOException e) { + throw new RuntimeException("Failed to convert Arrow schema to unshaded schema.", e); + } + } + + public static void loadArrowBatch( + MemorySegment segment, + int arrowOffset, + int arrowLength, + VectorSchemaRoot schemaRoot, + BufferAllocator allocator) { + ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength); + try (ReadChannel channel = + new ReadChannel(new ByteBufferReadableChannel(arrowBatchBuffer)); + ArrowRecordBatch batch = + MessageSerializer.deserializeRecordBatch(channel, allocator)) { + new UnshadedFlussVectorLoader(schemaRoot, UnshadedArrowCompressionFactory.INSTANCE) + .load(batch); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize ArrowRecordBatch.", e); + } + } + + public static VectorSchemaRoot projectVectorSchemaRoot( + VectorSchemaRoot sourceRoot, + RowType targetRowType, + int[] columnProjection, + BufferAllocator allocator) { + int rowCount = sourceRoot.getRowCount(); + Schema targetSchema = toArrowSchema(targetRowType); + VectorSchemaRoot targetRoot = VectorSchemaRoot.create(targetSchema, allocator); + for (int i = 0; i < columnProjection.length; i++) { + FieldVector targetVector = targetRoot.getVector(i); + initFieldVector(targetVector, rowCount); + if (columnProjection[i] < 0) { + fillNullVector(targetVector, rowCount); + } else { + FieldVector sourceVector = sourceRoot.getVector(columnProjection[i]); + // TODO: Optimize this projection path with Arrow vector transfer/copy utilities + // when we only need to reuse or reorder existing columns. The current row-by-row + // copy is acceptable for now because this path is not a hot path and is mainly + // used by the tiering service, which recreates scanners instead of scanning in a + // tight loop. + for (int rowId = 0; rowId < rowCount; rowId++) { + targetVector.copyFromSafe(rowId, rowId, sourceVector); + } + targetVector.setValueCount(rowCount); + } + } + targetRoot.setRowCount(rowCount); + return targetRoot; + } + + private static void fillNullVector(FieldVector fieldVector, int rowCount) { + for (int i = 0; i < rowCount; i++) { + fieldVector.setNull(i); + } + fieldVector.setValueCount(rowCount); + } + + private static void initFieldVector(FieldVector fieldVector, int rowCount) { + fieldVector.setInitialCapacity(rowCount); + if (fieldVector instanceof BaseFixedWidthVector) { + ((BaseFixedWidthVector) fieldVector).allocateNew(rowCount); + } else if (fieldVector instanceof BaseVariableWidthVector) { + ((BaseVariableWidthVector) fieldVector).allocateNew(rowCount); + } else if (fieldVector instanceof ListVector) { + ListVector listVector = (ListVector) fieldVector; + listVector.allocateNew(); + FieldVector dataVector = listVector.getDataVector(); + if (dataVector != null) { + initFieldVector(dataVector, rowCount); + } + } else { + fieldVector.allocateNew(); + } + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java index 63b3f52dbb..b0c0c214a6 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java @@ -648,6 +648,14 @@ void testStatisticsWithDifferentChangeTypes() throws Exception { readContext.close(); } + private org.apache.arrow.memory.BufferAllocator getAllocator( + ArrowRecordBatchContext.UnshadedArrowBatchAccess batchAccess) throws Exception { + java.lang.reflect.Field allocatorField = + batchAccess.getClass().getDeclaredField("allocator"); + allocatorField.setAccessible(true); + return (org.apache.arrow.memory.BufferAllocator) allocatorField.get(batchAccess); + } + private static List compressionInfos() { return Arrays.asList( new ArrowCompressionInfo(ArrowCompressionType.LZ4_FRAME, -1),