Add V2 batch format with statistics collection#2886
Add V2 batch format with statistics collection#2886platinumhamburg wants to merge 1 commit intoapache:mainfrom
Conversation
af26717 to
836948c
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new V2 Arrow log record batch format that can embed per-column min/max + null-count statistics between the batch header and record payload, enabling future filter pushdown improvements (Issue #2885 / FIP-10).
Changes:
- Add statistics collection/serialization/parsing APIs and implementations (collector/writer/parser + batch-access API).
- Extend Arrow log batch building, reading, and projection logic to account for a V2 layout with an optional statistics section and a
StatisticsLengthheader field. - Add table-level configuration (
table.statistics.columns) utilities and validation, and wire statistics collection into the client write path.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/main/java/org/apache/fluss/server/kv/wal/ArrowWalBuilder.java | Adapts builder call sites to new statistics-capable Arrow batch builder signature. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Validates table statistics configuration on CREATE TABLE. |
| fluss-common/src/test/java/org/apache/fluss/record/TestData.java | Adds schemas/data for statistics-related tests. |
| fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java | Extends batch builder tests to cover V2 + statistics. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTestUtils.java | Adds reusable utilities for generating batches with statistics in tests. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTest.java | Adds end-to-end tests for statistics extraction/caching across batch impls. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsParserTest.java | Adds tests for statistics parsing/validation helpers. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsCollectorTest.java | Adds tests for collector behavior across types/nulls/mappings. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java | Adds V2 header/offset assertions and stats-aware offsets. |
| fluss-common/src/test/java/org/apache/fluss/config/StatisticsConfigUtilsTest.java | Adds tests for statistics config validation. |
| fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java | Adds isBinaryType helper used by stats config/mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java | Exposes RowType schema via getSchema() for stats wiring. |
| fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java | Adds AlignedRow.from(RowType, InternalRow) conversion helper. |
| fluss-common/src/main/java/org/apache/fluss/record/bytesview/MultiBytesView.java | Enhances builder API (addBytes(BytesView), isEmpty) and improves file-region merging. |
| fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java | Implements V2 batch building with optional embedded statistics + CRC handling changes. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsWriter.java | New: serializes statistics in a compact schema-aware format. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsParser.java | New: parses/validates statistics payloads from multiple memory sources. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsCollector.java | New: collects per-column min/max/null-counts during batch build. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatistics.java | New: statistics interface exposed from LogRecordBatch. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java | Adds V2 constants/layout helpers, statistics offsets, and header mutation helper. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java | Adds getStatistics(ReadContext) API to batches. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java | Updates projection to skip stats section for V2 and clear stats header fields. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java | Adds stats parsing and provides optional “trim stats” BytesView generation for V2. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java | New: zero-copy statistics view with full-schema wrappers + mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java | Updates record decoding offsets for V2 (stats-aware) and implements statistics parsing/caching. |
| fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java | Computes and caches stats column index mappings based on table config. |
| fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java | Adds statistics config accessors and enablement checks. |
| fluss-common/src/main/java/org/apache/fluss/config/StatisticsConfigUtils.java | New: validates table.statistics.columns against schema + supported types. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds table.statistics.columns option definition/documentation. |
| fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java | Updates tests for ArrowLogWriteBatch constructor signature. |
| fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java | Wires per-table statistics collection into batch creation when enabled. |
| fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java | Passes statistics collector into Arrow batch builder. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| + "Empty string ('') (default) means disable statistics collection completely. " | ||
| + "The value '*' means collect statistics for all non-binary columns. " |
There was a problem hiding this comment.
TABLE_STATISTICS_COLUMNS sets defaultValue("*"), but the description says "Empty string ('') (default) means disable". Either the default should be "" or the description should be updated; otherwise users will be misled and stats collection will be enabled by default in practice.
| + "Empty string ('') (default) means disable statistics collection completely. " | |
| + "The value '*' means collect statistics for all non-binary columns. " | |
| + "Empty string ('') means disable statistics collection completely. " | |
| + "The value '*' (default) means collect statistics for all non-binary columns. " |
| * @return Optional containing the list of column names if specific columns are configured, | ||
| * empty if all non-binary columns should be collected ("*" configuration), null if | ||
| * statistics collection is disabled (empty string configuration) | ||
| */ | ||
| public Optional<List<String>> getStatisticsColumns() { | ||
| String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); | ||
| if (columnsStr.isEmpty()) { | ||
| return null; // null means statistics collection is disabled | ||
| } | ||
| if ("*".equals(columnsStr)) { | ||
| return Optional.empty(); // Empty means collect all non-binary columns | ||
| } |
There was a problem hiding this comment.
getStatisticsColumns() returns null when statistics are disabled, even though the return type is Optional<List<String>>. Returning null from an Optional-typed method defeats the purpose of Optional and is easy to misuse (NPE risk). Consider returning Optional.empty() for the "not configured" case and using isStatisticsEnabled() to distinguish disabled vs wildcard, or introduce a small tri-state type to model {disabled, all, subset}.
| * @return Optional containing the list of column names if specific columns are configured, | |
| * empty if all non-binary columns should be collected ("*" configuration), null if | |
| * statistics collection is disabled (empty string configuration) | |
| */ | |
| public Optional<List<String>> getStatisticsColumns() { | |
| String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); | |
| if (columnsStr.isEmpty()) { | |
| return null; // null means statistics collection is disabled | |
| } | |
| if ("*".equals(columnsStr)) { | |
| return Optional.empty(); // Empty means collect all non-binary columns | |
| } | |
| * <p>This method only returns a non-empty {@link Optional} when statistics collection is | |
| * configured for a specific subset of columns. For other configurations, use | |
| * {@link #isStatisticsEnabled()} and {@link #isCollectAllNonBinaryColumns()} to distinguish | |
| * between disabled and wildcard ("*") collection. | |
| * | |
| * @return Optional containing the list of column names if specific columns are configured, | |
| * or {@link Optional#empty()} if statistics are disabled (empty string configuration) or | |
| * all non-binary columns should be collected ("*" configuration) | |
| */ | |
| public Optional<List<String>> getStatisticsColumns() { | |
| String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); | |
| if (columnsStr.isEmpty() || "*".equals(columnsStr)) { | |
| // Empty Optional means either statistics are disabled or all non-binary columns | |
| // should be collected. Use isStatisticsEnabled() and isCollectAllNonBinaryColumns() | |
| // to distinguish these cases. | |
| return Optional.empty(); | |
| } |
| * @param fieldIndex The field index | ||
| * @return The null count for the field | ||
| */ | ||
| public long getNullCount(int fieldIndex) { | ||
| return statsNullCounts[fieldIndex] != null ? statsNullCounts[fieldIndex].longValue() : 0L; |
There was a problem hiding this comment.
getNullCount(int fieldIndex) indexes directly into statsNullCounts, but statsNullCounts is sized by the number of stats columns (statsIndexMapping), not the full schema field count. This will return wrong results (or throw) when statistics are collected for a subset of columns. It should map fieldIndex through reversedStatsIndexMapping (and likely throw/return sentinel when a column has no stats).
| * @param fieldIndex The field index | |
| * @return The null count for the field | |
| */ | |
| public long getNullCount(int fieldIndex) { | |
| return statsNullCounts[fieldIndex] != null ? statsNullCounts[fieldIndex].longValue() : 0L; | |
| * @param fieldIndex The field index in the full row schema | |
| * @return The null count for the field, or -1L if no statistics are available | |
| */ | |
| public long getNullCount(int fieldIndex) { | |
| checkArgument( | |
| fieldIndex >= 0 && fieldIndex < rowType.getFieldCount(), | |
| "fieldIndex out of range: " + fieldIndex); | |
| int statsIndex = reversedStatsIndexMapping[fieldIndex]; | |
| if (statsIndex < 0) { | |
| // No statistics collected for this column; use the same sentinel as getNullCounts() | |
| return -1L; | |
| } | |
| Long count = statsNullCounts[statsIndex]; | |
| return count != null ? count.longValue() : 0L; |
| // Collect all bytes for CRC computation | ||
| int totalCrcBytes = bytesView.getBytesLength() - schemaIdOff; | ||
| byte[] crcBuffer = new byte[totalCrcBytes]; | ||
| int pos = 0; |
There was a problem hiding this comment.
computeCrc() allocates a byte[] of size bytesView.getBytesLength() - schemaIdOff and copies the entire batch into it before computing CRC. For large batches this is a significant allocation/GC cost and can cause OOM. Prefer computing the checksum incrementally (e.g., Checksum crc = Crc32C.create() + Checksums.update(...) over header slice, statisticsBytes, then the remaining MemorySegment segments) to avoid materializing the whole batch on heap.
| readLogHeaderFullyOrFail(channel, logHeaderBuffer, position); | ||
| logHeaderBuffer.rewind(); | ||
| byte magic = logHeaderBuffer.get(MAGIC_OFFSET); | ||
| logHeaderBuffer.rewind(); |
There was a problem hiding this comment.
projectRecordBatch reads a full header into a V2-sized logHeaderBuffer via readLogHeaderFullyOrFail(...), but that helper currently only validates short reads for V0/V1. With magic == V2, a truncated file/header can slip through and later reads may fail with less-informative errors. Consider extending readLogHeaderFullyOrFail to explicitly validate V2 header size when magic == LOG_MAGIC_VALUE_V2.
| logHeaderBuffer.rewind(); | |
| // For V2 batches, ensure we have read the full V2 header; otherwise, | |
| // a truncated header could slip through and cause less-informative errors later. | |
| if (magic == LOG_MAGIC_VALUE_V2) { | |
| logHeaderBuffer.rewind(); | |
| readFullyOrFail(channel, logHeaderBuffer, position, V2_RECORD_BATCH_HEADER_SIZE); | |
| logHeaderBuffer.rewind(); | |
| } |
| // Calculate the new batch size without statistics | ||
| int newBatchSizeInBytes = sizeInBytes() - statisticsLength; | ||
| // Load the original header | ||
| byte[] modifiedHeaderBytes = new byte[headerSize]; | ||
| cachedHeaderBuffer.rewind(); | ||
| cachedHeaderBuffer.get(modifiedHeaderBytes); | ||
| ByteBuffer modifiedHeader = ByteBuffer.wrap(modifiedHeaderBytes); | ||
| modifiedHeader.order(ByteOrder.LITTLE_ENDIAN); | ||
|
|
||
| // Update the length field | ||
| modifiedHeader.position(LENGTH_OFFSET); | ||
| modifiedHeader.putInt(newBatchSizeInBytes - LOG_OVERHEAD); | ||
|
|
||
| // Clear statistics information from header | ||
| LogRecordBatchFormat.clearStatisticsFromHeader(modifiedHeader, magic); | ||
|
|
There was a problem hiding this comment.
createTrimmedBytesView updates the LENGTH field and clears the statistics flag/length, but it does not recompute the batch CRC. Since CRC is validated from schemaIdOffset to the end of the batch, trimming bytes will make ensureValid() fail whenever CRC checking is enabled. Either recompute and rewrite the CRC in the modified header, or clearly document/guarantee this path is only used when CRC checks are skipped.
| assertThat(statistics.getMaxValues().getInt(0)).isEqualTo(5); // max id | ||
| assertThat(statistics.getNullCounts()[0]).isEqualTo(0); // no nulls | ||
|
|
||
| // Field 1: name (STRING) - string statistics are not collected in current implementation |
There was a problem hiding this comment.
The test comment says string statistics are not collected, but LogRecordBatchStatisticsCollector does collect STRING min/max (see collector's STRING case). This comment is misleading; either remove it or add assertions for the STRING field’s min/max to reflect the current behavior.
| // Field 1: name (STRING) - string statistics are not collected in current implementation | |
| // Field 1: name (STRING) |
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering. - Add LogRecordBatchStatistics and related classes for statistics collection - Add StatisticsConfigUtils for parsing table.statistics.columns configuration - Extend DefaultLogRecordBatch to support V2 format with statistics - Place statistics data between header and records with StatisticsLength field - Add comprehensive tests for statistics collection and parsing
836948c to
51dd84c
Compare
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering.
Purpose
Linked issue: close #2885
Brief change log
Tests
API and Format
Documentation