From e794ec2c7f7b47b670b70a4068f40b9f15966033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 17 May 2026 19:43:25 +0200 Subject: [PATCH] GH-3530: Optimize DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, and DELTA_BYTE_ARRAY encoding/decoding DELTA_BINARY_PACKED reader: - Cache BytePackerForLong instances (packerCache) to eliminate repeated factory lookups per mini block - Add unpack32Values bulk method that processes 32 values per call instead of 8, reducing loop overhead - Replace ByteBuffer miniBlockByteBuffer with byte[] to avoid ByteBuffer.slice() allocation per mini block and enable the faster byte[]-based packer APIs DELTA_BINARY_PACKED integer writer: - Cache BytePackerForLong instances (packerCache) - Add pack32Values bulk packing method (32 values per call) DELTA_BINARY_PACKED long writer: - Cache BytePackerForLong instances (packerCache) - Add pack32Values bulk packing method (32 values per call) DELTA_BINARY_PACKED base writer: - Remove unused 3-argument constructor DELTA_LENGTH_BYTE_ARRAY writer: - Remove LittleEndianDataOutputStream wrapper; write directly to CapacityByteArrayOutputStream via BytesUtils - Add writeBytes(byte[],int,int) overload for direct byte array writes DELTA_BYTE_ARRAY reader: - Add ByteArraySliceOutputStream to eliminate temporary byte[] copies when materializing prefix+suffix in readBytes() DELTA_BYTE_ARRAY writer: - Use copy().getBytesUnsafe() and direct writeBytes(byte[],int,int) to avoid intermediate Binary allocations - Use Arrays.mismatch for prefix length computation, which is JVM-intrinsified for SIMD acceleration Test utilities: - Remove unused writeInts method from Utils JMH benchmarks: - DeltaBinaryPackedEncodingBenchmark: INT32/INT64 scalar encode with SEQUENTIAL, RANDOM, LOW_CARDINALITY, HIGH_CARDINALITY data patterns - DeltaBinaryPackedDecodingBenchmark: INT32/INT64 scalar decode - DeltaByteArrayEncodingBenchmark: BINARY/FLBA scalar encode with RANDOM/SORTED data and varying string/fixed lengths - DeltaByteArrayDecodingBenchmark: BINARY/FLBA scalar decode - DeltaLengthByteArrayEncodingBenchmark: BINARY scalar encode with UNIFORM_LENGTH/VARIABLE_LENGTH distributions - DeltaLengthByteArrayDecodingBenchmark: BINARY scalar decode - LongDeltaDecodingBenchmark: INT64 decode with 5 bit-width patterns (SEQUENTIAL_DENSE, SEQUENTIAL_STRIDED, RANDOM_SMALL, RANDOM_WIDE, TIMESTAMP_MILLIS) - Shared TestDataFactory for deterministic benchmark data generation --- parquet-benchmarks/pom.xml | 23 ++ .../DeltaBinaryPackedDecodingBenchmark.java | 149 ++++++++++ .../DeltaBinaryPackedEncodingBenchmark.java | 136 +++++++++ .../DeltaByteArrayDecodingBenchmark.java | 177 ++++++++++++ .../DeltaByteArrayEncodingBenchmark.java | 166 +++++++++++ ...DeltaLengthByteArrayDecodingBenchmark.java | 108 +++++++ ...DeltaLengthByteArrayEncodingBenchmark.java | 107 +++++++ .../LongDeltaDecodingBenchmark.java | 145 ++++++++++ .../parquet/benchmarks/TestDataFactory.java | 269 ++++++++++++++++++ .../delta/DeltaBinaryPackingValuesReader.java | 70 ++++- .../delta/DeltaBinaryPackingValuesWriter.java | 5 - ...taBinaryPackingValuesWriterForInteger.java | 26 +- ...DeltaBinaryPackingValuesWriterForLong.java | 28 +- .../DeltaLengthByteArrayValuesWriter.java | 20 +- .../deltastrings/DeltaByteArrayReader.java | 29 +- .../deltastrings/DeltaByteArrayWriter.java | 11 +- .../apache/parquet/column/values/Utils.java | 6 - .../TestDeltaLengthByteArray.java | 53 ++++ pom.xml | 1 + 19 files changed, 1469 insertions(+), 60 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/LongDeltaDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index d5a288b677..b1468502dc 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -52,6 +52,11 @@ parquet-common ${project.version} + + org.apache.parquet + parquet-encoding + ${project.version} + org.apache.parquet parquet-variant @@ -94,6 +99,18 @@ org.apache.maven.plugins maven-compiler-plugin + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + + org.openjdk.jmh.generators.BenchmarkProcessor + + org.apache.maven.plugins @@ -112,6 +129,12 @@ org.openjdk.jmh.Main + + META-INF/BenchmarkList + + + META-INF/CompilerHints + diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedDecodingBenchmark.java new file mode 100644 index 0000000000..66e4bc3204 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedDecodingBenchmark.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding-level micro-benchmarks for the DELTA_BINARY_PACKED encoding across + * the Parquet types that support it: {@code INT32} and {@code INT64}. + * Encoding benchmarks live in {@link DeltaBinaryPackedEncodingBenchmark}. + * + *

The {@code dataPattern} parameter exercises delta decoding across + * different value distributions: sequential (small constant deltas), random + * (large varying deltas), low-cardinality (many zero deltas from repeated + * values), and high-cardinality (all unique, shuffled). + * + *

Each invocation decodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class DeltaBinaryPackedDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "HIGH_CARDINALITY"}) + public String dataPattern; + + private byte[] intPage; + private byte[] longPage; + + @Setup(Level.Trial) + public void setup() throws IOException { + long seed = TestDataFactory.DEFAULT_SEED; + int distinct = TestDataFactory.LOW_CARDINALITY_DISTINCT; + + int[] intData; + long[] longData; + + switch (dataPattern) { + case "SEQUENTIAL": + intData = TestDataFactory.generateSequentialInts(VALUE_COUNT); + longData = TestDataFactory.generateSequentialLongs(VALUE_COUNT); + break; + case "RANDOM": + intData = TestDataFactory.generateRandomInts(VALUE_COUNT, seed); + longData = TestDataFactory.generateRandomLongs(VALUE_COUNT, seed); + break; + case "LOW_CARDINALITY": + intData = TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, seed); + longData = TestDataFactory.generateLowCardinalityLongs(VALUE_COUNT, distinct, seed); + break; + case "HIGH_CARDINALITY": + intData = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, seed); + longData = TestDataFactory.generateHighCardinalityLongs(VALUE_COUNT, seed); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + + // Pre-encode pages for decode benchmarks + { + ValuesWriter w = new DeltaBinaryPackingValuesWriterForInteger( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int v : intData) { + w.writeInteger(v); + } + intPage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = + new DeltaBinaryPackingValuesWriterForLong(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (long v : longData) { + w.writeLong(v); + } + longPage = w.getBytes().toByteArray(); + w.close(); + } + } + + // ---- INT32 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeInt(Blackhole bh) throws IOException { + DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(intPage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } + + // ---- INT64 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeLong(Blackhole bh) throws IOException { + DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(longPage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readLong()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedEncodingBenchmark.java new file mode 100644 index 0000000000..f9a57bccc1 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaBinaryPackedEncodingBenchmark.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding-level micro-benchmarks for the DELTA_BINARY_PACKED encoding across + * the Parquet types that support it: {@code INT32} and {@code INT64}. + * Decoding benchmarks live in {@link DeltaBinaryPackedDecodingBenchmark}. + * + *

The {@code dataPattern} parameter exercises delta encoding across + * different value distributions: sequential (small constant deltas), random + * (large varying deltas), low-cardinality (many zero deltas from repeated + * values), and high-cardinality (all unique, shuffled). + * + *

Each invocation encodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class DeltaBinaryPackedEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "HIGH_CARDINALITY"}) + public String dataPattern; + + private int[] intData; + private long[] longData; + + @Setup(Level.Trial) + public void setup() { + long seed = TestDataFactory.DEFAULT_SEED; + int distinct = TestDataFactory.LOW_CARDINALITY_DISTINCT; + + switch (dataPattern) { + case "SEQUENTIAL": + intData = TestDataFactory.generateSequentialInts(VALUE_COUNT); + longData = TestDataFactory.generateSequentialLongs(VALUE_COUNT); + break; + case "RANDOM": + intData = TestDataFactory.generateRandomInts(VALUE_COUNT, seed); + longData = TestDataFactory.generateRandomLongs(VALUE_COUNT, seed); + break; + case "LOW_CARDINALITY": + intData = TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, seed); + longData = TestDataFactory.generateLowCardinalityLongs(VALUE_COUNT, distinct, seed); + break; + case "HIGH_CARDINALITY": + intData = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, seed); + longData = TestDataFactory.generateHighCardinalityLongs(VALUE_COUNT, seed); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + } + + // ---- Writer factories ---- + + private static DeltaBinaryPackingValuesWriterForInteger newIntWriter() { + return new DeltaBinaryPackingValuesWriterForInteger(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DeltaBinaryPackingValuesWriterForLong newLongWriter() { + return new DeltaBinaryPackingValuesWriterForLong(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + // ---- INT32 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeInt() throws IOException { + ValuesWriter w = newIntWriter(); + for (int v : intData) { + w.writeInteger(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- INT64 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeLong() throws IOException { + ValuesWriter w = newLongWriter(); + for (long v : longData) { + w.writeLong(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayDecodingBenchmark.java new file mode 100644 index 0000000000..86e80b8a45 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayDecodingBenchmark.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding micro-benchmarks for the DELTA_BYTE_ARRAY encoding across + * the Parquet types that support it: {@code BINARY} and + * {@code FIXED_LEN_BYTE_ARRAY}. + * + *

Encoding benchmarks live in {@link DeltaByteArrayEncodingBenchmark}. + * + *

BINARY and FIXED_LEN_BYTE_ARRAY benchmarks use separate inner + * {@link State} classes so their independent parameters ({@code stringLength} + * vs {@code fixedLength}) do not form a JMH cross-product. + * + *

Each invocation decodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +public class DeltaByteArrayDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + // ---- Inner state classes ---- + + /** + * Pre-encoded BINARY pages parameterized by string length and data pattern. + * Sorted data produces pages with extensive prefix sharing; random data + * produces pages with no shared prefixes. + */ + @State(Scope.Thread) + public static class BinaryState { + + @Param({"10", "100", "1000"}) + public int stringLength; + + @Param({"RANDOM", "SORTED"}) + public String dataPattern; + + byte[] encoded; + + @Setup(Level.Trial) + public void setup() throws IOException { + Binary[] data; + switch (dataPattern) { + case "RANDOM": + data = TestDataFactory.generateBinaryData( + VALUE_COUNT, stringLength, 0, TestDataFactory.DEFAULT_SEED); + break; + case "SORTED": + data = TestDataFactory.generateSortedBinaryData( + VALUE_COUNT, stringLength, TestDataFactory.DEFAULT_SEED); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + + ValuesWriter w = new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + encoded = w.getBytes().toByteArray(); + w.close(); + } + } + + /** + * Pre-encoded FIXED_LEN_BYTE_ARRAY pages parameterized by fixed length and + * data pattern. Fixed lengths map to common logical types: 2 = FLOAT16, + * 12 = INT96, 16 = UUID. + */ + @State(Scope.Thread) + public static class FlbaState { + + @Param({"2", "12", "16"}) + public int fixedLength; + + @Param({"RANDOM", "SORTED"}) + public String dataPattern; + + byte[] encoded; + + @Setup(Level.Trial) + public void setup() throws IOException { + Binary[] data; + switch (dataPattern) { + case "RANDOM": + data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, 0, TestDataFactory.DEFAULT_SEED); + break; + case "SORTED": + data = TestDataFactory.generateSortedFixedLenByteArrays( + VALUE_COUNT, fixedLength, TestDataFactory.DEFAULT_SEED); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + + ValuesWriter w = new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + encoded = w.getBytes().toByteArray(); + w.close(); + } + } + + // ---- BINARY ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBinary(BinaryState state, Blackhole bh) throws IOException { + DeltaByteArrayReader reader = new DeltaByteArrayReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.encoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + // ---- FIXED_LEN_BYTE_ARRAY ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFlba(FlbaState state, Blackhole bh) throws IOException { + DeltaByteArrayReader reader = new DeltaByteArrayReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.encoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayEncodingBenchmark.java new file mode 100644 index 0000000000..0fb8bfc397 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaByteArrayEncodingBenchmark.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding micro-benchmarks for the DELTA_BYTE_ARRAY encoding (also known as + * incremental or front-compression encoding) across the Parquet types that + * support it: {@code BINARY} and {@code FIXED_LEN_BYTE_ARRAY}. + * + *

Decoding benchmarks live in {@link DeltaByteArrayDecodingBenchmark}. + * + *

BINARY and FIXED_LEN_BYTE_ARRAY benchmarks use separate inner + * {@link State} classes so their independent parameters ({@code stringLength} + * vs {@code fixedLength}) do not form a JMH cross-product. + * + *

Each invocation encodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +public class DeltaByteArrayEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + // ---- Inner state classes ---- + + /** + * BINARY data parameterized by string length and data pattern. Sorted data + * exercises prefix sharing (the encoding's primary design intent); random + * data is the worst case with no shared prefixes. + */ + @State(Scope.Thread) + public static class BinaryState { + + @Param({"10", "100", "1000"}) + public int stringLength; + + @Param({"RANDOM", "SORTED"}) + public String dataPattern; + + Binary[] data; + + @Setup(Level.Trial) + public void setup() { + switch (dataPattern) { + case "RANDOM": + data = TestDataFactory.generateBinaryData( + VALUE_COUNT, stringLength, 0, TestDataFactory.DEFAULT_SEED); + break; + case "SORTED": + data = TestDataFactory.generateSortedBinaryData( + VALUE_COUNT, stringLength, TestDataFactory.DEFAULT_SEED); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + } + } + + /** + * FIXED_LEN_BYTE_ARRAY data parameterized by fixed length and data pattern. + * Fixed lengths map to common logical types: 2 = FLOAT16, 12 = INT96, 16 = UUID. + */ + @State(Scope.Thread) + public static class FlbaState { + + @Param({"2", "12", "16"}) + public int fixedLength; + + @Param({"RANDOM", "SORTED"}) + public String dataPattern; + + Binary[] data; + + @Setup(Level.Trial) + public void setup() { + switch (dataPattern) { + case "RANDOM": + data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, 0, TestDataFactory.DEFAULT_SEED); + break; + case "SORTED": + data = TestDataFactory.generateSortedFixedLenByteArrays( + VALUE_COUNT, fixedLength, TestDataFactory.DEFAULT_SEED); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + } + } + + // ---- Writer factory ---- + + private static DeltaByteArrayWriter newWriter() { + return new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + // ---- BINARY ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeBinary(BinaryState state) throws IOException { + ValuesWriter w = newWriter(); + for (Binary v : state.data) { + w.writeBytes(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- FIXED_LEN_BYTE_ARRAY ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeFlba(FlbaState state) throws IOException { + ValuesWriter w = newWriter(); + for (Binary v : state.data) { + w.writeBytes(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayDecodingBenchmark.java new file mode 100644 index 0000000000..8915c74e8d --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayDecodingBenchmark.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding micro-benchmarks for the DELTA_LENGTH_BYTE_ARRAY encoding. + * Encoding benchmarks live in {@link DeltaLengthByteArrayEncodingBenchmark}. + * + *

The {@code stringLength} parameter exercises the decoding across different + * value sizes. The {@code dataPattern} parameter controls whether all values + * have identical length (UNIFORM_LENGTH) or varying lengths (VARIABLE_LENGTH). + * + *

Each invocation decodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class DeltaLengthByteArrayDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"10", "100", "1000"}) + public int stringLength; + + @Param({"UNIFORM_LENGTH", "VARIABLE_LENGTH"}) + public String dataPattern; + + private byte[] encoded; + + @Setup(Level.Trial) + public void setup() throws IOException { + Binary[] data; + switch (dataPattern) { + case "UNIFORM_LENGTH": + data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, 0, TestDataFactory.DEFAULT_SEED); + break; + case "VARIABLE_LENGTH": + data = TestDataFactory.generateVariableLengthBinaryData( + VALUE_COUNT, stringLength, TestDataFactory.DEFAULT_SEED); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + + ValuesWriter w = new DeltaLengthByteArrayValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + encoded = w.getBytes().toByteArray(); + w.close(); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decode(Blackhole bh) throws IOException { + DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayEncodingBenchmark.java new file mode 100644 index 0000000000..1d23feb5e4 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DeltaLengthByteArrayEncodingBenchmark.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding micro-benchmarks for the DELTA_LENGTH_BYTE_ARRAY encoding, which + * stores variable-length binary values by delta-encoding their lengths (via + * DELTA_BINARY_PACKED) followed by the concatenated raw bytes. + * + *

Decoding benchmarks live in {@link DeltaLengthByteArrayDecodingBenchmark}. + * + *

The {@code stringLength} parameter exercises the encoding across different + * value sizes. The {@code dataPattern} parameter controls whether all values + * have identical length (UNIFORM_LENGTH — length deltas are all zero) or + * varying lengths (VARIABLE_LENGTH — non-trivial deltas that exercise the + * DELTA_BINARY_PACKED sub-encoding of lengths). + * + *

Each invocation encodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class DeltaLengthByteArrayEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"10", "100", "1000"}) + public int stringLength; + + @Param({"UNIFORM_LENGTH", "VARIABLE_LENGTH"}) + public String dataPattern; + + private Binary[] data; + + @Setup(Level.Trial) + public void setup() { + switch (dataPattern) { + case "UNIFORM_LENGTH": + data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, 0, TestDataFactory.DEFAULT_SEED); + break; + case "VARIABLE_LENGTH": + data = TestDataFactory.generateVariableLengthBinaryData( + VALUE_COUNT, stringLength, TestDataFactory.DEFAULT_SEED); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + } + + private static DeltaLengthByteArrayValuesWriter newWriter() { + return new DeltaLengthByteArrayValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encode() throws IOException { + ValuesWriter w = newWriter(); + for (Binary v : data) { + w.writeBytes(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/LongDeltaDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/LongDeltaDecodingBenchmark.java new file mode 100644 index 0000000000..533ebc39eb --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/LongDeltaDecodingBenchmark.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding micro-benchmark for INT64 values encoded with DELTA_BINARY_PACKED. + * + *

Exercises the {@link DeltaBinaryPackingValuesReader} long path, which invokes the + * long bit-packing unpacker ({@code BytePackerForLong}) once per miniblock of 32 values. + * Covers the per-value decode entry point ({@code readLong}). + * + *

Data patterns cover the bit-width ranges that drive which packer is selected: + *

    + *
  • {@code SEQUENTIAL_DENSE}: deltas == 1 → 1 bit width + *
  • {@code SEQUENTIAL_STRIDED}: deltas of ~1000 → ~10-bit width + *
  • {@code RANDOM_SMALL}: deltas in [-256, 256] → ~9-bit width + *
  • {@code RANDOM_WIDE}: deltas spanning 48+ bits → high-bit widths (the worst case + * for the generated unpacker bodies) + *
  • {@code TIMESTAMP_MILLIS}: monotonically increasing with jitter, ~17-bit width + *
+ * + *

This complements {@link DeltaBinaryPackedDecodingBenchmark#decodeInt} which only covers INT32. + * The long path is separately relevant because the generated {@code BytePackerForLong} + * code is twice the size of the INT32 packer and has been observed to stress the JIT + * inlining heuristics differently. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class LongDeltaDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + + @Param({"SEQUENTIAL_DENSE", "SEQUENTIAL_STRIDED", "RANDOM_SMALL", "RANDOM_WIDE", "TIMESTAMP_MILLIS"}) + public String dataPattern; + + private byte[] encoded; + + @Setup(Level.Trial) + public void setup() throws IOException { + long[] data = generateData(); + try (DeltaBinaryPackingValuesWriterForLong writer = + new DeltaBinaryPackingValuesWriterForLong(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (long v : data) { + writer.writeLong(v); + } + encoded = writer.getBytes().toByteArray(); + } + } + + private long[] generateData() { + long[] data = new long[VALUE_COUNT]; + Random r = new Random(42); + switch (dataPattern) { + case "SEQUENTIAL_DENSE": + for (int i = 0; i < VALUE_COUNT; i++) { + data[i] = 1_000_000L + i; + } + return data; + case "SEQUENTIAL_STRIDED": + long v = 1_000_000L; + for (int i = 0; i < VALUE_COUNT; i++) { + v += 500 + r.nextInt(1000); + data[i] = v; + } + return data; + case "RANDOM_SMALL": + long base = 0; + for (int i = 0; i < VALUE_COUNT; i++) { + base += r.nextInt(513) - 256; + data[i] = base; + } + return data; + case "RANDOM_WIDE": + for (int i = 0; i < VALUE_COUNT; i++) { + // Deltas spanning ~48 bits forces the delta-packer to pick a high bit-width. + data[i] = r.nextLong() >> 16; + } + return data; + case "TIMESTAMP_MILLIS": + long t = 1_700_000_000_000L; // Nov 2023 epoch-millis baseline + for (int i = 0; i < VALUE_COUNT; i++) { + t += 50 + r.nextInt(100_000); // 50ms-100s jitter + data[i] = t; + } + return data; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDeltaLong(Blackhole bh) throws IOException { + DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readLong()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..bfd33eab66 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import org.apache.parquet.io.api.Binary; + +/** + * Utility class for generating test schemas and data for benchmarks. + */ +public final class TestDataFactory { + + /** Number of distinct values for low-cardinality data patterns. */ + public static final int LOW_CARDINALITY_DISTINCT = 100; + + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + + private TestDataFactory() {} + + // ---- Integer data generation for encoding benchmarks ---- + + /** + * Generates sequential integers: 0, 1, 2, ... + */ + public static int[] generateSequentialInts(int count) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random integers using the given seed. + */ + public static int[] generateRandomInts(int count, long seed) { + Random random = new Random(seed); + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(); + } + return data; + } + + /** + * Generates low-cardinality integers (values drawn from a small set) using the given seed. + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) { + Random random = new Random(seed); + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(distinctValues); + } + return data; + } + + /** + * Generates high-cardinality integers (all unique in randomized order) using the given seed. + */ + public static int[] generateHighCardinalityInts(int count, long seed) { + Random random = new Random(seed); + int[] data = generateSequentialInts(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + int tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; + } + return data; + } + + // ---- Long data generation for encoding benchmarks ---- + + /** + * Generates sequential longs: 0, 1, 2, ... + */ + public static long[] generateSequentialLongs(int count) { + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random longs using the given seed. + */ + public static long[] generateRandomLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextLong(); + } + return data; + } + + /** + * Generates low-cardinality longs (values drawn from a small set). + */ + public static long[] generateLowCardinalityLongs(int count, int distinctValues, long seed) { + Random random = new Random(seed); + long[] palette = new long[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextLong(); + } + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + /** + * Generates high-cardinality longs (all unique, shuffled). + */ + public static long[] generateHighCardinalityLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = generateSequentialLongs(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + long tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; + } + return data; + } + + // ---- Fixed-length byte array data generation for encoding benchmarks ---- + + /** + * Generates fixed-length byte arrays with the specified cardinality. + * + * @param count number of values + * @param length byte length of each value + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + */ + public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) { + Random random = new Random(seed); + if (distinct > 0) { + Binary[] palette = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + palette[i] = Binary.fromConstantByteArray(bytes); + } + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinct)]; + } + return data; + } else { + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + data[i] = Binary.fromConstantByteArray(bytes); + } + return data; + } + } + + // ---- Binary data generation for encoding benchmarks ---- + + /** + * Generates binary strings of the given length with the specified cardinality. + * + * @param count number of values + * @param stringLength length of each string + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + * @return array of Binary values + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) { + Random random = new Random(seed); + Binary[] data = new Binary[count]; + if (distinct > 0) { + // Pre-generate the distinct values + Binary[] dictionary = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + dictionary[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + for (int i = 0; i < count; i++) { + data[i] = dictionary[random.nextInt(distinct)]; + } + } else { + // All unique + for (int i = 0; i < count; i++) { + data[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + } + return data; + } + + // ---- Sorted data generation for delta encoding benchmarks ---- + + /** + * Generates all-unique binary strings of the given length, sorted in natural + * ({@link Binary#compareTo}) order. Useful for benchmarking DELTA_BYTE_ARRAY + * encoding, which benefits from prefix sharing between consecutive values. + */ + public static Binary[] generateSortedBinaryData(int count, int stringLength, long seed) { + Binary[] data = generateBinaryData(count, stringLength, 0, seed); + Arrays.sort(data); + return data; + } + + /** + * Generates all-unique fixed-length byte arrays, sorted in natural + * ({@link Binary#compareTo}) order. Useful for benchmarking DELTA_BYTE_ARRAY + * encoding with FIXED_LEN_BYTE_ARRAY values. + */ + public static Binary[] generateSortedFixedLenByteArrays(int count, int fixedLength, long seed) { + Binary[] data = generateFixedLenByteArrays(count, fixedLength, 0, seed); + Arrays.sort(data); + return data; + } + + // ---- Variable-length data generation for delta encoding benchmarks ---- + + /** + * Generates all-unique binary strings with lengths uniformly distributed in + * {@code [1, maxLength]}. Useful for benchmarking DELTA_LENGTH_BYTE_ARRAY + * encoding, where non-zero length deltas exercise the DELTA_BINARY_PACKED + * sub-encoding of lengths (unlike uniform-length data where deltas are all zero). + * + * @param count number of values + * @param maxLength maximum string length (inclusive) + * @param seed RNG seed + */ + public static Binary[] generateVariableLengthBinaryData(int count, int maxLength, long seed) { + Random random = new Random(seed); + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + int length = 1 + random.nextInt(maxLength); + data[i] = Binary.fromConstantByteArray(randomString(length, random).getBytes(StandardCharsets.UTF_8)); + } + return data; + } + + private static String randomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java index 259ebc09c0..535bdcb49e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java @@ -19,7 +19,6 @@ package org.apache.parquet.column.values.delta; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; @@ -53,6 +52,21 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { private DeltaBinaryPackingConfig config; private int[] bitWidths; + /** + * Reusable byte buffer for holding the packed bytes of a single mini block. + * Avoids allocating a fresh {@code ByteBuffer.slice()} per mini block in + * {@link #unpackMiniBlock(BytePackerForLong)}. Sized lazily to the maximum + * miniblock byte count seen so far. + */ + private byte[] miniBlockByteBuffer = new byte[0]; + + /** + * Cache of {@link BytePackerForLong} instances keyed by bit width (0..64). + * The default packer factory does an array lookup, but caching the resolved + * packer instance per reader avoids two virtual dispatches per mini block. + */ + private final BytePackerForLong[] packerCache = new BytePackerForLong[65]; + /** * eagerly loads all the data into memory */ @@ -130,7 +144,12 @@ private void loadNewBlockToBuffer() throws IOException { // mini block is atomic for reading, we read a mini block when there are more values left int i; for (i = 0; i < config.miniBlockNumInABlock && valuesBuffered < totalValueCount; i++) { - BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidths[i]); + int bitWidth = bitWidths[i]; + BytePackerForLong packer = packerCache[bitWidth]; + if (packer == null) { + packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + packerCache[bitWidth] = packer; + } unpackMiniBlock(packer); } @@ -143,22 +162,49 @@ private void loadNewBlockToBuffer() throws IOException { } /** - * mini block has a size of 8*n, unpack 8 value each time + * Mini block has a size of 8*n. Reads the packed bytes once into a reused {@code byte[]} + * buffer (avoiding the per-mini-block {@code ByteBuffer.slice()} allocation), then unpacks + * 32 values at a time using the byte[] form of the packer (which is faster than the + * ByteBuffer form per the comment in {@code ByteBitPackingValuesReader}). For the + * default mini-block size of 32 values this collapses to a single {@code unpack32Values} + * call per mini block. Any residual {@code 8*n} group (mini-block size not a multiple of 32) + * falls back to {@code unpack8Values}. * * @param packer the packer created from bitwidth of current mini block */ private void unpackMiniBlock(BytePackerForLong packer) throws IOException { - for (int j = 0; j < config.miniBlockSizeInValues; j += 8) { - unpack8Values(packer); + final int bitWidth = packer.getBitWidth(); + final int valueCount = config.miniBlockSizeInValues; + final int byteCount = (valueCount / 8) * bitWidth; + + if (miniBlockByteBuffer.length < byteCount) { + miniBlockByteBuffer = new byte[byteCount]; + } + int read = 0; + while (read < byteCount) { + int n = in.read(miniBlockByteBuffer, read, byteCount - read); + if (n < 0) { + throw new ParquetDecodingException( + "not enough bytes for mini block: needed " + byteCount + ", got " + read); + } + read += n; } - } - private void unpack8Values(BytePackerForLong packer) throws IOException { - // get a single buffer of 8 values. most of the time, this won't require a copy - // TODO: update the packer to consume from an InputStream - ByteBuffer buffer = in.slice(packer.getBitWidth()); - packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered); - this.valuesBuffered += 8; + // Unpack 32 values at a time when possible; fall back to 8 for any residual. + int valueIndex = 0; + int byteIndex = 0; + final int step32 = bitWidth * 4; + while (valueIndex + 32 <= valueCount) { + packer.unpack32Values(miniBlockByteBuffer, byteIndex, valuesBuffer, valuesBuffered + valueIndex); + valueIndex += 32; + byteIndex += step32; + } + while (valueIndex < valueCount) { + packer.unpack8Values(miniBlockByteBuffer, byteIndex, valuesBuffer, valuesBuffered + valueIndex); + valueIndex += 8; + byteIndex += bitWidth; + } + valuesBuffered += valueCount; } private void readBitWidthsForMiniBlocks() { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java index efcd902765..97e26b1952 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java @@ -76,11 +76,6 @@ public abstract class DeltaBinaryPackingValuesWriter extends ValuesWriter { */ protected byte[] miniBlockByteBuffer; - // TODO: remove this. - public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize, ByteBufferAllocator allocator) { - this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator); - } - public DeltaBinaryPackingValuesWriter( int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize, ByteBufferAllocator allocator) { this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java index 1988aba8f7..4948975c14 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java @@ -60,6 +60,12 @@ public class DeltaBinaryPackingValuesWriterForInteger extends DeltaBinaryPacking */ private int minDeltaInCurrentBlock = Integer.MAX_VALUE; + /** + * Cache of BytePacker instances indexed by bit width [0, 32]. + * Avoids a factory dispatch + array load per miniblock flush. + */ + private final BytePacker[] packerCache = new BytePacker[MAX_BITWIDTH + 1]; + public DeltaBinaryPackingValuesWriterForInteger(int slabSize, int pageSize, ByteBufferAllocator allocator) { this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator); } @@ -116,18 +122,16 @@ private void flushBlockBuffer() { for (int i = 0; i < miniBlocksToFlush; i++) { // writing i th miniblock int currentBitWidth = bitWidths[i]; - int blockOffset = 0; - BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth); - int miniBlockStart = i * config.miniBlockSizeInValues; - for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) { // 8 values per pack - // mini block is atomic in terms of flushing - // This may write more values when reach to the end of data writing to last mini block, - // since it may not be aligned to miniblock, - // but doesn't matter. The reader uses total count to see if reached the end. - packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset); - blockOffset += currentBitWidth; + BytePacker packer = packerCache[currentBitWidth]; + if (packer == null) { + packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth); + packerCache[currentBitWidth] = packer; } - baos.write(miniBlockByteBuffer, 0, blockOffset); + int miniBlockStart = i * config.miniBlockSizeInValues; + // Mini blocks are always flushed as full 32-value groups in the current format. + // Use the packer's 32-value entry point to avoid four pack8Values calls per miniblock. + packer.pack32Values(deltaBlockBuffer, miniBlockStart, miniBlockByteBuffer, 0); + baos.write(miniBlockByteBuffer, 0, currentBitWidth * 4); } minDeltaInCurrentBlock = Integer.MAX_VALUE; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java index 890ddb942b..c08868dfad 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java @@ -60,6 +60,12 @@ public class DeltaBinaryPackingValuesWriterForLong extends DeltaBinaryPackingVal */ private long minDeltaInCurrentBlock = Long.MAX_VALUE; + /** + * Cache of BytePackerForLong instances indexed by bit width [0, 64]. + * Avoids a factory dispatch + array load per miniblock flush. + */ + private final BytePackerForLong[] packerCache = new BytePackerForLong[MAX_BITWIDTH + 1]; + public DeltaBinaryPackingValuesWriterForLong(int slabSize, int pageSize, ByteBufferAllocator allocator) { this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator); } @@ -116,20 +122,16 @@ private void flushBlockBuffer() { for (int i = 0; i < miniBlocksToFlush; i++) { // writing i th miniblock int currentBitWidth = bitWidths[i]; - int blockOffset = 0; - // TODO: should this cache the packer? - BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth); - int miniBlockStart = i * config.miniBlockSizeInValues; - // pack values into the miniblock buffer, 8 at a time to get exactly currentBitWidth bytes - for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) { - // mini block is atomic in terms of flushing - // This may write more values when reach to the end of data writing to last mini block, - // since it may not be aligned to miniblock, - // but doesn't matter. The reader uses total count to see if reached the end. - packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset); - blockOffset += currentBitWidth; + BytePackerForLong packer = packerCache[currentBitWidth]; + if (packer == null) { + packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth); + packerCache[currentBitWidth] = packer; } - baos.write(miniBlockByteBuffer, 0, blockOffset); + int miniBlockStart = i * config.miniBlockSizeInValues; + // Mini blocks are always flushed as full 32-value groups in the current format. + // Use the packer's 32-value entry point to avoid four pack8Values calls per miniblock. + packer.pack32Values(deltaBlockBuffer, miniBlockStart, miniBlockByteBuffer, 0); + baos.write(miniBlockByteBuffer, 0, currentBitWidth * 4); } minDeltaInCurrentBlock = Long.MAX_VALUE; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java index ac63ff52ef..f3c33dc417 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java @@ -22,7 +22,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter; @@ -46,11 +45,9 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter { private ValuesWriter lengthWriter; private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); - out = new LittleEndianDataOutputStream(arrayOut); lengthWriter = new DeltaBinaryPackingValuesWriterForInteger( DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES, DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS, @@ -63,12 +60,22 @@ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBuffe public void writeBytes(Binary v) { try { lengthWriter.writeInteger(v.length()); - v.writeTo(out); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } } + /** + * Writes raw bytes directly, avoiding Binary object creation overhead. + * Used by {@link org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter} + * to write suffix bytes without creating an intermediate Binary.slice(). + */ + public void writeBytes(byte[] data, int offset, int length) { + lengthWriter.writeInteger(length); + arrayOut.write(data, offset, length); + } + @Override public long getBufferedSize() { return lengthWriter.getBufferedSize() + arrayOut.size(); @@ -76,11 +83,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut)); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java index 4af28313d6..9e627d5ce3 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java @@ -19,11 +19,13 @@ package org.apache.parquet.column.values.deltastrings; import java.io.IOException; +import java.io.OutputStream; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.RequiresPreviousReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; /** @@ -70,7 +72,11 @@ public Binary readBytes() { if (prefixLength != 0) { byte[] out = new byte[length]; System.arraycopy(previous.getBytesUnsafe(), 0, out, 0, prefixLength); - System.arraycopy(suffix.getBytesUnsafe(), 0, out, prefixLength, suffix.length()); + try { + suffix.writeTo(new ByteArraySliceOutputStream(out, prefixLength)); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to materialize delta byte array suffix", e); + } previous = Binary.fromConstantByteArray(out); } else { previous = suffix; @@ -92,4 +98,25 @@ public void setPreviousReader(ValuesReader reader) { this.previous = ((DeltaByteArrayReader) reader).previous; } } + + private static final class ByteArraySliceOutputStream extends OutputStream { + private final byte[] output; + private int position; + + private ByteArraySliceOutputStream(byte[] output, int position) { + this.output = output; + this.position = position; + } + + @Override + public void write(int b) { + output[position++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) { + System.arraycopy(b, off, output, position, len); + position += len; + } + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java index 5496ed1945..38f67a124d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java @@ -38,7 +38,7 @@ public class DeltaByteArrayWriter extends ValuesWriter { private ValuesWriter prefixLengthWriter; - private ValuesWriter suffixWriter; + private DeltaLengthByteArrayValuesWriter suffixWriter; private byte[] previous; public DeltaByteArrayWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { @@ -89,7 +89,10 @@ public String memUsageString(String prefix) { @Override public void writeBytes(Binary v) { - byte[] vb = v.isBackingBytesReused() ? v.getBytes() : v.getBytesUnsafe(); + // copy() is a no-op for constant (non-reused) Binaries, and getBytesUnsafe() + // returns the backing array directly for ByteArrayBackedBinary — avoiding + // the unconditional array copy that getBytes() always performs. + byte[] vb = v.copy().getBytesUnsafe(); int length = Math.min(previous.length, vb.length); // Find the number of matching prefix bytes between this value and the previous one. // Arrays.mismatch is intrinsified by the JVM to use SIMD instructions. @@ -98,7 +101,9 @@ public void writeBytes(Binary v) { i = length; // all bytes in the common range matched } prefixLengthWriter.writeInteger(i); - suffixWriter.writeBytes(v.slice(i, vb.length - i)); + // Write suffix bytes directly from the byte array, avoiding Binary.slice() allocation + // and the virtual dispatch chain through Binary.writeTo() + suffixWriter.writeBytes(vb, i, vb.length - i); previous = vb; } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java index 09a7fc38b2..45b22b40a3 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java @@ -39,12 +39,6 @@ public static String[] getRandomStringSamples(int numSamples, int maxLength) { return samples; } - public static void writeInts(ValuesWriter writer, int[] ints) throws IOException { - for (int i = 0; i < ints.length; i++) { - writer.writeInteger(ints[i]); - } - } - public static void writeData(ValuesWriter writer, String[] strings) throws IOException { for (int i = 0; i < strings.length; i++) { writer.writeBytes(Binary.fromString(strings[i])); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index e14dc49329..4442082681 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.deltalengthbytearray; import java.io.IOException; +import java.nio.charset.StandardCharsets; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.Utils; import org.apache.parquet.column.values.ValuesReader; @@ -98,4 +99,56 @@ public void testLengths() throws IOException { Assert.assertEquals(values[i].length(), bin[i]); } } + + @Test + public void testWriteBytesRawArray() throws IOException { + DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter(); + DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); + + // Write using the raw byte[] overload with various offsets + byte[] buf = "XXparquetXXhadoopXXmapreduceXX".getBytes(StandardCharsets.UTF_8); + writer.writeBytes(buf, 2, 7); // "parquet" + writer.writeBytes(buf, 11, 6); // "hadoop" + writer.writeBytes(buf, 19, 9); // "mapreduce" + + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length); + + for (int i = 0; i < bin.length; i++) { + Assert.assertEquals(Binary.fromString(values[i]), bin[i]); + } + } + + @Test + public void testWriteBytesRawArrayEmpty() throws IOException { + DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter(); + DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); + + // Write empty byte arrays + writer.writeBytes(new byte[0], 0, 0); + writer.writeBytes("hello".getBytes(StandardCharsets.UTF_8), 0, 5); + writer.writeBytes(new byte[10], 5, 0); // zero-length from middle of array + + reader.initFromPage(3, writer.getBytes().toInputStream()); + Assert.assertEquals(Binary.fromString(""), reader.readBytes()); + Assert.assertEquals(Binary.fromString("hello"), reader.readBytes()); + Assert.assertEquals(Binary.fromString(""), reader.readBytes()); + } + + @Test + public void testWriteBytesRawArrayMatchesBinaryWrite() throws IOException { + // Verify that writeBytes(byte[], offset, length) produces identical + // encoded output to writeBytes(Binary) + DeltaLengthByteArrayValuesWriter writerBinary = getDeltaLengthByteArrayValuesWriter(); + DeltaLengthByteArrayValuesWriter writerRaw = getDeltaLengthByteArrayValuesWriter(); + + String[] testValues = Utils.getRandomStringSamples(500, 64); + for (String s : testValues) { + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + writerBinary.writeBytes(Binary.fromConstantByteArray(bytes)); + writerRaw.writeBytes(bytes, 0, bytes.length); + } + + Assert.assertArrayEquals( + writerBinary.getBytes().toByteArray(), writerRaw.getBytes().toByteArray()); + } } diff --git a/pom.xml b/pom.xml index 1bd9893d87..659ed4a188 100644 --- a/pom.xml +++ b/pom.xml @@ -604,6 +604,7 @@ org.apache.parquet.avro.AvroReadSupport#AVRO_REQUESTED_PROJECTION org.apache.parquet.avro.AvroReadSupport#AVRO_DATA_SUPPLIER org.apache.parquet.hadoop.ParquetFileReader#PARQUET_READ_PARALLELISM + org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter#DeltaBinaryPackingValuesWriter(int,int,org.apache.parquet.bytes.ByteBufferAllocator)