Skip to content

Commit 7aae9be

Browse files
committed
GH-3530: Optimize DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, and DELTA_BYTE_ARRAY encoding/decoding
DELTA_BINARY_PACKED reader: - Cache BytePackerForLong instances (packerCache) to eliminate repeated factory lookups per mini block - Add unpack32Values bulk method that processes 32 values per call instead of 8, reducing loop overhead - Replace ByteBuffer miniBlockByteBuffer with byte[] to avoid ByteBuffer.slice() allocation per mini block and enable the faster byte[]-based packer APIs DELTA_BINARY_PACKED integer writer: - Cache BytePackerForLong instances (packerCache) - Add pack32Values bulk packing method (32 values per call) DELTA_BINARY_PACKED long writer: - Cache BytePackerForLong instances (packerCache) - Add pack32Values bulk packing method (32 values per call) DELTA_BINARY_PACKED base writer: - Remove unused 3-argument constructor DELTA_LENGTH_BYTE_ARRAY writer: - Remove LittleEndianDataOutputStream wrapper; write directly to CapacityByteArrayOutputStream via BytesUtils - Add writeBytes(byte[],int,int) overload for direct byte array writes DELTA_BYTE_ARRAY reader: - Add ByteArraySliceOutputStream to eliminate temporary byte[] copies when materializing prefix+suffix in readBytes() DELTA_BYTE_ARRAY writer: - Use copy().getBytesUnsafe() and direct writeBytes(byte[],int,int) to avoid intermediate Binary allocations - Use Arrays.mismatch for prefix length computation, which is JVM-intrinsified for SIMD acceleration Test utilities: - Remove unused writeInts method from Utils JMH benchmarks: - DeltaBinaryPackedEncodingBenchmark: INT32/INT64 scalar encode with SEQUENTIAL, RANDOM, LOW_CARDINALITY, HIGH_CARDINALITY data patterns - DeltaBinaryPackedDecodingBenchmark: INT32/INT64 scalar decode - DeltaByteArrayEncodingBenchmark: BINARY/FLBA scalar encode with RANDOM/SORTED data and varying string/fixed lengths - DeltaByteArrayDecodingBenchmark: BINARY/FLBA scalar decode - DeltaLengthByteArrayEncodingBenchmark: BINARY scalar encode with UNIFORM_LENGTH/VARIABLE_LENGTH distributions - DeltaLengthByteArrayDecodingBenchmark: BINARY scalar decode - LongDeltaDecodingBenchmark: INT64 decode with 5 bit-width patterns (SEQUENTIAL_DENSE, SEQUENTIAL_STRIDED, RANDOM_SMALL, RANDOM_WIDE, TIMESTAMP_MILLIS) - Shared TestDataFactory for deterministic benchmark data generation
1 parent c7e7acb commit 7aae9be

18 files changed

Lines changed: 1468 additions & 60 deletions

parquet-benchmarks/pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@
5252
<artifactId>parquet-common</artifactId>
5353
<version>${project.version}</version>
5454
</dependency>
55+
<dependency>
56+
<groupId>org.apache.parquet</groupId>
57+
<artifactId>parquet-encoding</artifactId>
58+
<version>${project.version}</version>
59+
</dependency>
5560
<dependency>
5661
<groupId>org.apache.parquet</groupId>
5762
<artifactId>parquet-variant</artifactId>
@@ -94,6 +99,18 @@
9499
<plugin>
95100
<groupId>org.apache.maven.plugins</groupId>
96101
<artifactId>maven-compiler-plugin</artifactId>
102+
<configuration>
103+
<annotationProcessorPaths>
104+
<path>
105+
<groupId>org.openjdk.jmh</groupId>
106+
<artifactId>jmh-generator-annprocess</artifactId>
107+
<version>${jmh.version}</version>
108+
</path>
109+
</annotationProcessorPaths>
110+
<annotationProcessors>
111+
<annotationProcessor>org.openjdk.jmh.generators.BenchmarkProcessor</annotationProcessor>
112+
</annotationProcessors>
113+
</configuration>
97114
</plugin>
98115
<plugin>
99116
<groupId>org.apache.maven.plugins</groupId>
@@ -112,6 +129,12 @@
112129
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
113130
<mainClass>org.openjdk.jmh.Main</mainClass>
114131
</transformer>
132+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
133+
<resource>META-INF/BenchmarkList</resource>
134+
</transformer>
135+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
136+
<resource>META-INF/CompilerHints</resource>
137+
</transformer>
115138
</transformers>
116139
<artifactSet>
117140
<includes>
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.parquet.bytes.ByteBufferInputStream;
25+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
26+
import org.apache.parquet.column.values.ValuesWriter;
27+
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
28+
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
29+
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
30+
import org.openjdk.jmh.annotations.Benchmark;
31+
import org.openjdk.jmh.annotations.BenchmarkMode;
32+
import org.openjdk.jmh.annotations.Fork;
33+
import org.openjdk.jmh.annotations.Level;
34+
import org.openjdk.jmh.annotations.Measurement;
35+
import org.openjdk.jmh.annotations.Mode;
36+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
37+
import org.openjdk.jmh.annotations.OutputTimeUnit;
38+
import org.openjdk.jmh.annotations.Param;
39+
import org.openjdk.jmh.annotations.Scope;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.annotations.State;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
import org.openjdk.jmh.infra.Blackhole;
44+
45+
/**
46+
* Decoding-level micro-benchmarks for the DELTA_BINARY_PACKED encoding across
47+
* the Parquet types that support it: {@code INT32} and {@code INT64}.
48+
* Encoding benchmarks live in {@link DeltaBinaryPackedEncodingBenchmark}.
49+
*
50+
* <p>The {@code dataPattern} parameter exercises delta decoding across
51+
* different value distributions: sequential (small constant deltas), random
52+
* (large varying deltas), low-cardinality (many zero deltas from repeated
53+
* values), and high-cardinality (all unique, shuffled).
54+
*
55+
* <p>Each invocation decodes {@value #VALUE_COUNT} values; throughput is
56+
* reported per-value via {@link OperationsPerInvocation}.
57+
*/
58+
@BenchmarkMode(Mode.Throughput)
59+
@OutputTimeUnit(TimeUnit.SECONDS)
60+
@Fork(1)
61+
@Warmup(iterations = 3, time = 1)
62+
@Measurement(iterations = 5, time = 1)
63+
@State(Scope.Thread)
64+
public class DeltaBinaryPackedDecodingBenchmark {
65+
66+
static final int VALUE_COUNT = 100_000;
67+
private static final int INIT_SLAB_SIZE = 64 * 1024;
68+
private static final int PAGE_SIZE = 1024 * 1024;
69+
70+
@Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "HIGH_CARDINALITY"})
71+
public String dataPattern;
72+
73+
private byte[] intPage;
74+
private byte[] longPage;
75+
76+
@Setup(Level.Trial)
77+
public void setup() throws IOException {
78+
long seed = TestDataFactory.DEFAULT_SEED;
79+
int distinct = TestDataFactory.LOW_CARDINALITY_DISTINCT;
80+
81+
int[] intData;
82+
long[] longData;
83+
84+
switch (dataPattern) {
85+
case "SEQUENTIAL":
86+
intData = TestDataFactory.generateSequentialInts(VALUE_COUNT);
87+
longData = TestDataFactory.generateSequentialLongs(VALUE_COUNT);
88+
break;
89+
case "RANDOM":
90+
intData = TestDataFactory.generateRandomInts(VALUE_COUNT, seed);
91+
longData = TestDataFactory.generateRandomLongs(VALUE_COUNT, seed);
92+
break;
93+
case "LOW_CARDINALITY":
94+
intData = TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, seed);
95+
longData = TestDataFactory.generateLowCardinalityLongs(VALUE_COUNT, distinct, seed);
96+
break;
97+
case "HIGH_CARDINALITY":
98+
intData = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, seed);
99+
longData = TestDataFactory.generateHighCardinalityLongs(VALUE_COUNT, seed);
100+
break;
101+
default:
102+
throw new IllegalArgumentException("Unknown data pattern: " + dataPattern);
103+
}
104+
105+
// Pre-encode pages for decode benchmarks
106+
{
107+
ValuesWriter w = new DeltaBinaryPackingValuesWriterForInteger(
108+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
109+
for (int v : intData) {
110+
w.writeInteger(v);
111+
}
112+
intPage = w.getBytes().toByteArray();
113+
w.close();
114+
}
115+
{
116+
ValuesWriter w =
117+
new DeltaBinaryPackingValuesWriterForLong(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
118+
for (long v : longData) {
119+
w.writeLong(v);
120+
}
121+
longPage = w.getBytes().toByteArray();
122+
w.close();
123+
}
124+
}
125+
126+
// ---- INT32 ----
127+
128+
@Benchmark
129+
@OperationsPerInvocation(VALUE_COUNT)
130+
public void decodeInt(Blackhole bh) throws IOException {
131+
DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader();
132+
reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(intPage)));
133+
for (int i = 0; i < VALUE_COUNT; i++) {
134+
bh.consume(reader.readInteger());
135+
}
136+
}
137+
138+
// ---- INT64 ----
139+
140+
@Benchmark
141+
@OperationsPerInvocation(VALUE_COUNT)
142+
public void decodeLong(Blackhole bh) throws IOException {
143+
DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader();
144+
reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(longPage)));
145+
for (int i = 0; i < VALUE_COUNT; i++) {
146+
bh.consume(reader.readLong());
147+
}
148+
}
149+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.TimeUnit;
23+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
24+
import org.apache.parquet.column.values.ValuesWriter;
25+
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
26+
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
27+
import org.openjdk.jmh.annotations.Benchmark;
28+
import org.openjdk.jmh.annotations.BenchmarkMode;
29+
import org.openjdk.jmh.annotations.Fork;
30+
import org.openjdk.jmh.annotations.Level;
31+
import org.openjdk.jmh.annotations.Measurement;
32+
import org.openjdk.jmh.annotations.Mode;
33+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
34+
import org.openjdk.jmh.annotations.OutputTimeUnit;
35+
import org.openjdk.jmh.annotations.Param;
36+
import org.openjdk.jmh.annotations.Scope;
37+
import org.openjdk.jmh.annotations.Setup;
38+
import org.openjdk.jmh.annotations.State;
39+
import org.openjdk.jmh.annotations.Warmup;
40+
41+
/**
42+
* Encoding-level micro-benchmarks for the DELTA_BINARY_PACKED encoding across
43+
* the Parquet types that support it: {@code INT32} and {@code INT64}.
44+
* Decoding benchmarks live in {@link DeltaBinaryPackedDecodingBenchmark}.
45+
*
46+
* <p>The {@code dataPattern} parameter exercises delta encoding across
47+
* different value distributions: sequential (small constant deltas), random
48+
* (large varying deltas), low-cardinality (many zero deltas from repeated
49+
* values), and high-cardinality (all unique, shuffled).
50+
*
51+
* <p>Each invocation encodes {@value #VALUE_COUNT} values; throughput is
52+
* reported per-value via {@link OperationsPerInvocation}.
53+
*/
54+
@BenchmarkMode(Mode.Throughput)
55+
@OutputTimeUnit(TimeUnit.SECONDS)
56+
@Fork(1)
57+
@Warmup(iterations = 3, time = 1)
58+
@Measurement(iterations = 5, time = 1)
59+
@State(Scope.Thread)
60+
public class DeltaBinaryPackedEncodingBenchmark {
61+
62+
static final int VALUE_COUNT = 100_000;
63+
private static final int INIT_SLAB_SIZE = 64 * 1024;
64+
private static final int PAGE_SIZE = 1024 * 1024;
65+
66+
@Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "HIGH_CARDINALITY"})
67+
public String dataPattern;
68+
69+
private int[] intData;
70+
private long[] longData;
71+
72+
@Setup(Level.Trial)
73+
public void setup() {
74+
long seed = TestDataFactory.DEFAULT_SEED;
75+
int distinct = TestDataFactory.LOW_CARDINALITY_DISTINCT;
76+
77+
switch (dataPattern) {
78+
case "SEQUENTIAL":
79+
intData = TestDataFactory.generateSequentialInts(VALUE_COUNT);
80+
longData = TestDataFactory.generateSequentialLongs(VALUE_COUNT);
81+
break;
82+
case "RANDOM":
83+
intData = TestDataFactory.generateRandomInts(VALUE_COUNT, seed);
84+
longData = TestDataFactory.generateRandomLongs(VALUE_COUNT, seed);
85+
break;
86+
case "LOW_CARDINALITY":
87+
intData = TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, seed);
88+
longData = TestDataFactory.generateLowCardinalityLongs(VALUE_COUNT, distinct, seed);
89+
break;
90+
case "HIGH_CARDINALITY":
91+
intData = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, seed);
92+
longData = TestDataFactory.generateHighCardinalityLongs(VALUE_COUNT, seed);
93+
break;
94+
default:
95+
throw new IllegalArgumentException("Unknown data pattern: " + dataPattern);
96+
}
97+
}
98+
99+
// ---- Writer factories ----
100+
101+
private static DeltaBinaryPackingValuesWriterForInteger newIntWriter() {
102+
return new DeltaBinaryPackingValuesWriterForInteger(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
103+
}
104+
105+
private static DeltaBinaryPackingValuesWriterForLong newLongWriter() {
106+
return new DeltaBinaryPackingValuesWriterForLong(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
107+
}
108+
109+
// ---- INT32 ----
110+
111+
@Benchmark
112+
@OperationsPerInvocation(VALUE_COUNT)
113+
public byte[] encodeInt() throws IOException {
114+
ValuesWriter w = newIntWriter();
115+
for (int v : intData) {
116+
w.writeInteger(v);
117+
}
118+
byte[] bytes = w.getBytes().toByteArray();
119+
w.close();
120+
return bytes;
121+
}
122+
123+
// ---- INT64 ----
124+
125+
@Benchmark
126+
@OperationsPerInvocation(VALUE_COUNT)
127+
public byte[] encodeLong() throws IOException {
128+
ValuesWriter w = newLongWriter();
129+
for (long v : longData) {
130+
w.writeLong(v);
131+
}
132+
byte[] bytes = w.getBytes().toByteArray();
133+
w.close();
134+
return bytes;
135+
}
136+
}

0 commit comments

Comments
 (0)