From 399c7bba6e24358c74dea68fe6650dbf8d09f618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 17 May 2026 12:59:47 +0200 Subject: [PATCH] GH-3530: Optimize DICTIONARY encoding/decoding data structures and use ByteBuffer Encoding improvements: - Replace LinkedOpenHashMap with OpenHashMap + ArrayList for all DictionaryValuesWriter subclasses, eliminating insertion-order overhead and enabling O(1) indexed access for dictionary page serialization and fallback - Make IntList.size() O(1) by tracking totalSize incrementally instead of summing across slab arrays Decoding improvements: - Convert PlainValuesDictionary numeric constructors (INT32, INT64, FLOAT, DOUBLE) from InputStream-based per-byte reads to direct ByteBuffer.getInt/getLong/getFloat/getDouble (JVM intrinsics) Binary hashCode caching: - Cache hashCode() for Binary instances that are not backed by reusable byte arrays, avoiding redundant recomputation during dictionary lookups (hash map probes) JMH benchmarks: - DictionaryEncodingBenchmark: scalar encoding for INT32, INT64, FLOAT, DOUBLE, BINARY, and FIXED_LEN_BYTE_ARRAY with LOW/HIGH cardinality and variable-length string/FLBA dimensions - DictionaryDecodingBenchmark: scalar decoding for all types with matching parameterization - TestDataFactory: shared data generation utility for reproducible benchmark inputs - BenchmarkEncodingUtils: helper to drain DictionaryValuesWriter into encoded dictionary page + data bytes for decoder setup --- parquet-benchmarks/pom.xml | 9 + .../benchmarks/BenchmarkEncodingUtils.java | 70 ++++ .../DictionaryDecodingBenchmark.java | 351 ++++++++++++++++++ .../DictionaryEncodingBenchmark.java | 236 ++++++++++++ .../parquet/benchmarks/TestDataFactory.java | 230 ++++++++++++ .../dictionary/DictionaryValuesWriter.java | 140 +++---- .../column/values/dictionary/IntList.java | 9 +- .../dictionary/PlainValuesDictionary.java | 33 +- .../org/apache/parquet/io/api/Binary.java | 47 ++- .../column/values/dictionary/IntListTest.java | 7 + .../values/dictionary/TestDictionary.java | 75 +++- .../org/apache/parquet/io/api/TestBinary.java | 45 +++ 12 files changed, 1133 insertions(+), 119 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index d5a288b677..3ce4dce5ce 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -94,6 +94,15 @@ org.apache.maven.plugins maven-compiler-plugin + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + org.apache.maven.plugins diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java new file mode 100644 index 0000000000..c79dedce28 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; + +/** + * Shared helpers for encode/decode micro-benchmarks. + */ +final class BenchmarkEncodingUtils { + + private BenchmarkEncodingUtils() {} + + /** + * Container for the two artefacts produced by a dictionary-encoded page: + * the encoded dictionary indices ({@link #dictData}) and the dictionary + * page itself ({@link #dictPage}). The dictionary page may be {@code null} + * if the writer fell back to plain encoding (for example, when the + * dictionary exceeded its configured maximum size). + */ + static final class EncodedDictionary { + final byte[] dictData; + final DictionaryPage dictPage; + + EncodedDictionary(byte[] dictData, DictionaryPage dictPage) { + this.dictData = dictData; + this.dictPage = dictPage; + } + + boolean fellBackToPlain() { + return dictPage == null; + } + } + + /** + * Drains a {@link DictionaryValuesWriter} into an {@link EncodedDictionary}. + * + *

The writer's data bytes (the RLE-encoded indices) and the dictionary + * page are returned separately so both pieces can be measured or fed to a + * decoder symmetrically. The dictionary page buffer is copied so it remains + * valid after the writer's allocator is released. + * + *

The writer is closed via {@code toDictPageAndClose()}; callers must not + * call {@link DictionaryValuesWriter#close()} again afterwards. + */ + static EncodedDictionary drainDictionary(DictionaryValuesWriter writer) throws IOException { + byte[] dictData = writer.getBytes().toByteArray(); + DictionaryPage rawPage = writer.toDictPageAndClose(); + DictionaryPage dictPage = rawPage == null ? null : rawPage.copy(); + return new EncodedDictionary(dictData, dictPage); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryDecodingBenchmark.java new file mode 100644 index 0000000000..8b05893933 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryDecodingBenchmark.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.column.values.dictionary.PlainValuesDictionary; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding-level micro-benchmarks for the DICTIONARY encoding across all Parquet + * types that support it: {@code INT32}, {@code INT64}, {@code FLOAT}, + * {@code DOUBLE}, {@code BINARY}, and {@code FIXED_LEN_BYTE_ARRAY}. + * Encoding benchmarks live in {@link DictionaryEncodingBenchmark}. + * + *

Each type group uses its own inner {@link State} class with independent + * {@code @Param} dimensions to avoid JMH cross-product pollution: + *

+ * + *

Decode benchmarks measure the {@link DictionaryValuesReader} lookup path. + * Each invocation decodes {@value #VALUE_COUNT} values; throughput is reported + * per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +public class DictionaryDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int MAX_DICT_BYTE_SIZE = 4 * 1024 * 1024; + + // ==== Fixed-width numeric types (parameterised by dataPattern) ==== + + @State(Scope.Thread) + public static class NumericState { + @Param({"LOW_CARDINALITY", "HIGH_CARDINALITY"}) + public String dataPattern; + + // Pre-encoded dictionary pages + byte[] intDictDataEncoded; + Dictionary intDictionary; + boolean intDictAvailable; + + byte[] longDictDataEncoded; + Dictionary longDictionary; + boolean longDictAvailable; + + byte[] floatDictDataEncoded; + Dictionary floatDictionary; + boolean floatDictAvailable; + + byte[] doubleDictDataEncoded; + Dictionary doubleDictionary; + boolean doubleDictAvailable; + + @Setup(Level.Trial) + public void setup() throws IOException { + int distinct = "LOW_CARDINALITY".equals(dataPattern) + ? TestDataFactory.LOW_CARDINALITY_DISTINCT + : 0; // 0 = all unique for HIGH_CARDINALITY + + long seed = TestDataFactory.DEFAULT_SEED; + + int[] intData; + long[] longData; + float[] floatData; + double[] doubleData; + + if (distinct > 0) { + intData = TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, seed); + longData = TestDataFactory.generateLowCardinalityLongs(VALUE_COUNT, distinct, seed); + floatData = TestDataFactory.generateLowCardinalityFloats(VALUE_COUNT, distinct, seed); + doubleData = TestDataFactory.generateLowCardinalityDoubles(VALUE_COUNT, distinct, seed); + } else { + intData = TestDataFactory.generateRandomInts(VALUE_COUNT, seed); + longData = TestDataFactory.generateRandomLongs(VALUE_COUNT, seed); + floatData = TestDataFactory.generateRandomFloats(VALUE_COUNT, seed); + doubleData = TestDataFactory.generateRandomDoubles(VALUE_COUNT, seed); + } + + setupIntDict(intData); + setupLongDict(longData); + setupFloatDict(floatData); + setupDoubleDict(doubleData); + } + + private void setupIntDict(int[] data) throws IOException { + DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (int v : data) { + w.writeInteger(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + intDictDataEncoded = enc.dictData; + intDictAvailable = !enc.fellBackToPlain(); + if (intDictAvailable) { + intDictionary = new PlainValuesDictionary.PlainIntegerDictionary(enc.dictPage); + } + } + + private void setupLongDict(long[] data) throws IOException { + DictionaryValuesWriter.PlainLongDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainLongDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (long v : data) { + w.writeLong(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + longDictDataEncoded = enc.dictData; + longDictAvailable = !enc.fellBackToPlain(); + if (longDictAvailable) { + longDictionary = new PlainValuesDictionary.PlainLongDictionary(enc.dictPage); + } + } + + private void setupFloatDict(float[] data) throws IOException { + DictionaryValuesWriter.PlainFloatDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainFloatDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (float v : data) { + w.writeFloat(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + floatDictDataEncoded = enc.dictData; + floatDictAvailable = !enc.fellBackToPlain(); + if (floatDictAvailable) { + floatDictionary = new PlainValuesDictionary.PlainFloatDictionary(enc.dictPage); + } + } + + private void setupDoubleDict(double[] data) throws IOException { + DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (double v : data) { + w.writeDouble(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + doubleDictDataEncoded = enc.dictData; + doubleDictAvailable = !enc.fellBackToPlain(); + if (doubleDictAvailable) { + doubleDictionary = new PlainValuesDictionary.PlainDoubleDictionary(enc.dictPage); + } + } + } + + // ---- Fixed-width numeric decode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeInt(NumericState state, Blackhole bh) throws IOException { + if (!state.intDictAvailable) return; + DictionaryValuesReader r = new DictionaryValuesReader(state.intDictionary); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.intDictDataEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeLong(NumericState state, Blackhole bh) throws IOException { + if (!state.longDictAvailable) return; + DictionaryValuesReader r = new DictionaryValuesReader(state.longDictionary); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.longDictDataEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readLong()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFloat(NumericState state, Blackhole bh) throws IOException { + if (!state.floatDictAvailable) return; + DictionaryValuesReader r = new DictionaryValuesReader(state.floatDictionary); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.floatDictDataEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readFloat()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDouble(NumericState state, Blackhole bh) throws IOException { + if (!state.doubleDictAvailable) return; + DictionaryValuesReader r = new DictionaryValuesReader(state.doubleDictionary); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.doubleDictDataEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readDouble()); + } + } + + // ==== BINARY (parameterised by stringLength and cardinality) ==== + + @State(Scope.Thread) + public static class BinaryState { + @Param({"10", "100", "1000"}) + public int stringLength; + + @Param({"LOW", "HIGH"}) + public String cardinality; + + byte[] dictData; + Dictionary dictionary; + boolean dictAvailable; + + @Setup(Level.Trial) + public void setup() throws IOException { + int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + Binary[] data = TestDataFactory.generateBinaryData( + VALUE_COUNT, stringLength, distinct, TestDataFactory.DEFAULT_SEED); + + DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + dictData = enc.dictData; + dictAvailable = !enc.fellBackToPlain(); + if (dictAvailable) { + dictionary = new PlainValuesDictionary.PlainBinaryDictionary(enc.dictPage); + } + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBinary(BinaryState state, Blackhole bh) throws IOException { + if (!state.dictAvailable) return; + DictionaryValuesReader r = new DictionaryValuesReader(state.dictionary); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.dictData))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readBytes()); + } + } + + // ==== FIXED_LEN_BYTE_ARRAY (parameterised by fixedLength and cardinality) ==== + + @State(Scope.Thread) + public static class FlbaState { + @Param({"2", "12", "16"}) + public int fixedLength; + + @Param({"LOW", "HIGH"}) + public String cardinality; + + byte[] dictData; + Dictionary dictionary; + boolean dictAvailable; + + @Setup(Level.Trial) + public void setup() throws IOException { + int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + Binary[] data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, distinct, TestDataFactory.DEFAULT_SEED); + + DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + fixedLength, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + dictData = enc.dictData; + dictAvailable = !enc.fellBackToPlain(); + if (dictAvailable) { + dictionary = new PlainValuesDictionary.PlainBinaryDictionary(enc.dictPage, fixedLength); + } + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFlba(FlbaState state, Blackhole bh) throws IOException { + if (!state.dictAvailable) return; + DictionaryValuesReader r = new DictionaryValuesReader(state.dictionary); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.dictData))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryEncodingBenchmark.java new file mode 100644 index 0000000000..1549205d3a --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DictionaryEncodingBenchmark.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding-level micro-benchmarks for the DICTIONARY encoding across all Parquet + * types that support it: {@code INT32}, {@code INT64}, {@code FLOAT}, + * {@code DOUBLE}, {@code BINARY}, and {@code FIXED_LEN_BYTE_ARRAY}. + * Decoding benchmarks live in {@link DictionaryDecodingBenchmark}. + * + *

Each type group uses its own inner {@link State} class with independent + * {@code @Param} dimensions to avoid JMH cross-product pollution: + *

+ * + *

Each type's encode benchmark measures the full dictionary-build path + * (type-specific hash map + id append). Each invocation encodes + * {@value #VALUE_COUNT} values; throughput is reported per-value via + * {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +public class DictionaryEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int MAX_DICT_BYTE_SIZE = 4 * 1024 * 1024; + + // ==== Fixed-width numeric types (parameterised by dataPattern) ==== + + @State(Scope.Thread) + public static class NumericState { + @Param({"LOW_CARDINALITY", "HIGH_CARDINALITY"}) + public String dataPattern; + + int[] intData; + long[] longData; + float[] floatData; + double[] doubleData; + + @Setup(Level.Trial) + public void setup() { + int distinct = "LOW_CARDINALITY".equals(dataPattern) + ? TestDataFactory.LOW_CARDINALITY_DISTINCT + : 0; // 0 = all unique for HIGH_CARDINALITY + + long seed = TestDataFactory.DEFAULT_SEED; + + if (distinct > 0) { + intData = TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, seed); + longData = TestDataFactory.generateLowCardinalityLongs(VALUE_COUNT, distinct, seed); + floatData = TestDataFactory.generateLowCardinalityFloats(VALUE_COUNT, distinct, seed); + doubleData = TestDataFactory.generateLowCardinalityDoubles(VALUE_COUNT, distinct, seed); + } else { + intData = TestDataFactory.generateRandomInts(VALUE_COUNT, seed); + longData = TestDataFactory.generateRandomLongs(VALUE_COUNT, seed); + floatData = TestDataFactory.generateRandomFloats(VALUE_COUNT, seed); + doubleData = TestDataFactory.generateRandomDoubles(VALUE_COUNT, seed); + } + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeInt(NumericState state, Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + for (int v : state.intData) { + w.writeInteger(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeLong(NumericState state, Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainLongDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainLongDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + for (long v : state.longData) { + w.writeLong(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeFloat(NumericState state, Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainFloatDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainFloatDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + for (float v : state.floatData) { + w.writeFloat(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeDouble(NumericState state, Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + for (double v : state.doubleData) { + w.writeDouble(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } + + // ==== BINARY (parameterised by stringLength and cardinality) ==== + + @State(Scope.Thread) + public static class BinaryState { + @Param({"10", "100", "1000"}) + public int stringLength; + + @Param({"LOW", "HIGH"}) + public String cardinality; + + Binary[] data; + + @Setup(Level.Trial) + public void setup() { + int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + data = TestDataFactory.generateBinaryData( + VALUE_COUNT, stringLength, distinct, TestDataFactory.DEFAULT_SEED); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeBinary(BinaryState state, Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + for (Binary v : state.data) { + w.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } + + // ==== FIXED_LEN_BYTE_ARRAY (parameterised by fixedLength and cardinality) ==== + + @State(Scope.Thread) + public static class FlbaState { + @Param({"2", "12", "16"}) + public int fixedLength; + + @Param({"LOW", "HIGH"}) + public String cardinality; + + Binary[] data; + + @Setup(Level.Trial) + public void setup() { + int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, distinct, TestDataFactory.DEFAULT_SEED); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeFlba(FlbaState state, Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + state.fixedLength, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (Binary v : state.data) { + w.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..6bd19a0148 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.nio.charset.StandardCharsets; +import java.util.Random; +import org.apache.parquet.io.api.Binary; + +/** + * Utility class for generating test data for dictionary encoding benchmarks. + */ +public final class TestDataFactory { + + /** Number of distinct values for low-cardinality data patterns. */ + public static final int LOW_CARDINALITY_DISTINCT = 100; + + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + + private TestDataFactory() {} + + // ---- Integer data generation for encoding benchmarks ---- + + /** + * Generates uniformly random integers using the given seed. + */ + public static int[] generateRandomInts(int count, long seed) { + Random random = new Random(seed); + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(); + } + return data; + } + + /** + * Generates low-cardinality integers (values drawn from a small set) using the given seed. + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) { + Random random = new Random(seed); + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(distinctValues); + } + return data; + } + + // ---- Long data generation for encoding benchmarks ---- + + /** + * Generates uniformly random longs using the given seed. + */ + public static long[] generateRandomLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextLong(); + } + return data; + } + + /** + * Generates low-cardinality longs (values drawn from a small set). + */ + public static long[] generateLowCardinalityLongs(int count, int distinctValues, long seed) { + Random random = new Random(seed); + long[] palette = new long[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextLong(); + } + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Float data generation for encoding benchmarks ---- + + /** + * Generates uniformly random floats using the given seed. + */ + public static float[] generateRandomFloats(int count, long seed) { + Random random = new Random(seed); + float[] data = new float[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextFloat() * 1000.0f; + } + return data; + } + + /** + * Generates low-cardinality floats (values drawn from a small set). + */ + public static float[] generateLowCardinalityFloats(int count, int distinctValues, long seed) { + Random random = new Random(seed); + float[] palette = new float[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextFloat() * 1000.0f; + } + float[] data = new float[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Double data generation for encoding benchmarks ---- + + /** + * Generates uniformly random doubles using the given seed. + */ + public static double[] generateRandomDoubles(int count, long seed) { + Random random = new Random(seed); + double[] data = new double[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextDouble() * 1000.0; + } + return data; + } + + /** + * Generates low-cardinality doubles (values drawn from a small set). + */ + public static double[] generateLowCardinalityDoubles(int count, int distinctValues, long seed) { + Random random = new Random(seed); + double[] palette = new double[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextDouble() * 1000.0; + } + double[] data = new double[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Fixed-length byte array data generation for encoding benchmarks ---- + + /** + * Generates fixed-length byte arrays with the specified cardinality. + * + * @param count number of values + * @param length byte length of each value + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + */ + public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) { + Random random = new Random(seed); + if (distinct > 0) { + Binary[] palette = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + palette[i] = Binary.fromConstantByteArray(bytes); + } + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinct)]; + } + return data; + } else { + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + data[i] = Binary.fromConstantByteArray(bytes); + } + return data; + } + } + + // ---- Binary data generation for encoding benchmarks ---- + + /** + * Generates binary strings of the given length with the specified cardinality. + * + * @param count number of values + * @param stringLength length of each string + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + * @return array of Binary values + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) { + Random random = new Random(seed); + Binary[] data = new Binary[count]; + if (distinct > 0) { + // Pre-generate the distinct values + Binary[] dictionary = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + dictionary[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + for (int i = 0; i < count; i++) { + data[i] = dictionary[random.nextInt(distinct)]; + } + } else { + // All unique + for (int i = 0; i < count; i++) { + data[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + } + return data; + } + + private static String randomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java index 53526ae8d0..dbca68e886 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -20,23 +20,22 @@ import static org.apache.parquet.bytes.BytesInput.concat; -import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap; import it.unimi.dsi.fastutil.doubles.Double2IntMap; -import it.unimi.dsi.fastutil.doubles.DoubleIterator; -import it.unimi.dsi.fastutil.floats.Float2IntLinkedOpenHashMap; +import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import it.unimi.dsi.fastutil.floats.Float2IntMap; -import it.unimi.dsi.fastutil.floats.FloatIterator; -import it.unimi.dsi.fastutil.ints.Int2IntLinkedOpenHashMap; +import it.unimi.dsi.fastutil.floats.Float2IntOpenHashMap; +import it.unimi.dsi.fastutil.floats.FloatArrayList; import it.unimi.dsi.fastutil.ints.Int2IntMap; -import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.longs.Long2IntMap; -import it.unimi.dsi.fastutil.longs.LongIterator; -import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.Object2IntMap; -import it.unimi.dsi.fastutil.objects.ObjectIterator; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; @@ -231,7 +230,8 @@ public String memUsageString(String prefix) { public static class PlainBinaryDictionaryValuesWriter extends DictionaryValuesWriter { /* type specific dictionary content */ - protected Object2IntMap binaryDictionaryContent = new Object2IntLinkedOpenHashMap<>(); + protected Object2IntMap binaryDictionaryContent = new Object2IntOpenHashMap<>(); + protected List dictionaryValues = new ArrayList<>(); public PlainBinaryDictionaryValuesWriter( int maxDictionaryByteSize, @@ -246,8 +246,10 @@ public PlainBinaryDictionaryValuesWriter( public void writeBytes(Binary v) { int id = binaryDictionaryContent.getInt(v); if (id == -1) { - id = binaryDictionaryContent.size(); - binaryDictionaryContent.put(v.copy(), id); + id = dictionaryValues.size(); + Binary copied = v.copy(); + binaryDictionaryContent.put(copied, id); + dictionaryValues.add(copied); // length as int (4 bytes) + actual bytes dictionaryByteSize += 4L + v.length(); } @@ -260,12 +262,9 @@ public DictionaryPage toDictPageAndClose() { // return a dictionary only if we actually used it PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); - Iterator binaryIterator = - binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { - Binary entry = binaryIterator.next(); - dictionaryEncoder.writeBytes(entry); + dictionaryEncoder.writeBytes(dictionaryValues.get(i)); } return dictPage(dictionaryEncoder); } @@ -280,21 +279,16 @@ public int getDictionarySize() { @Override protected void clearDictionaryContent() { binaryDictionaryContent.clear(); + dictionaryValues.clear(); } @Override public void fallBackDictionaryEncodedData(ValuesWriter writer) { - // build reverse dictionary - Binary[] reverseDictionary = new Binary[getDictionarySize()]; - for (Object2IntMap.Entry entry : binaryDictionaryContent.object2IntEntrySet()) { - reverseDictionary[entry.getIntValue()] = entry.getKey(); - } - - // fall back to plain encoding + // fall back to plain encoding using the ordered dictionary values list IntIterator iterator = encodedValues.iterator(); while (iterator.hasNext()) { int id = iterator.next(); - writer.writeBytes(reverseDictionary[id]); + writer.writeBytes(dictionaryValues.get(id)); } } } @@ -317,8 +311,10 @@ public PlainFixedLenArrayDictionaryValuesWriter( public void writeBytes(Binary value) { int id = binaryDictionaryContent.getInt(value); if (id == -1) { - id = binaryDictionaryContent.size(); - binaryDictionaryContent.put(value.copy(), id); + id = dictionaryValues.size(); + Binary copied = value.copy(); + binaryDictionaryContent.put(copied, id); + dictionaryValues.add(copied); dictionaryByteSize += length; } encodedValues.add(id); @@ -330,12 +326,9 @@ public DictionaryPage toDictPageAndClose() { // return a dictionary only if we actually used it FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter( length, lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); - Iterator binaryIterator = - binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { - Binary entry = binaryIterator.next(); - dictionaryEncoder.writeBytes(entry); + dictionaryEncoder.writeBytes(dictionaryValues.get(i)); } return dictPage(dictionaryEncoder); } @@ -346,7 +339,8 @@ public DictionaryPage toDictPageAndClose() { public static class PlainLongDictionaryValuesWriter extends DictionaryValuesWriter { /* type specific dictionary content */ - private Long2IntMap longDictionaryContent = new Long2IntLinkedOpenHashMap(); + private Long2IntMap longDictionaryContent = new Long2IntOpenHashMap(); + private LongArrayList dictionaryValues = new LongArrayList(); public PlainLongDictionaryValuesWriter( int maxDictionaryByteSize, @@ -361,8 +355,9 @@ public PlainLongDictionaryValuesWriter( public void writeLong(long v) { int id = longDictionaryContent.get(v); if (id == -1) { - id = longDictionaryContent.size(); + id = dictionaryValues.size(); longDictionaryContent.put(v, id); + dictionaryValues.add(v); dictionaryByteSize += 8; } encodedValues.add(id); @@ -374,10 +369,9 @@ public DictionaryPage toDictPageAndClose() { // return a dictionary only if we actually used it PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); - LongIterator longIterator = longDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { - dictionaryEncoder.writeLong(longIterator.nextLong()); + dictionaryEncoder.writeLong(dictionaryValues.getLong(i)); } return dictPage(dictionaryEncoder); } @@ -392,24 +386,16 @@ public int getDictionarySize() { @Override protected void clearDictionaryContent() { longDictionaryContent.clear(); + dictionaryValues.clear(); } @Override public void fallBackDictionaryEncodedData(ValuesWriter writer) { - // build reverse dictionary - long[] reverseDictionary = new long[getDictionarySize()]; - ObjectIterator entryIterator = - longDictionaryContent.long2IntEntrySet().iterator(); - while (entryIterator.hasNext()) { - Long2IntMap.Entry entry = entryIterator.next(); - reverseDictionary[entry.getIntValue()] = entry.getLongKey(); - } - // fall back to plain encoding IntIterator iterator = encodedValues.iterator(); while (iterator.hasNext()) { int id = iterator.next(); - writer.writeLong(reverseDictionary[id]); + writer.writeLong(dictionaryValues.getLong(id)); } } } @@ -417,7 +403,8 @@ public void fallBackDictionaryEncodedData(ValuesWriter writer) { public static class PlainDoubleDictionaryValuesWriter extends DictionaryValuesWriter { /* type specific dictionary content */ - private Double2IntMap doubleDictionaryContent = new Double2IntLinkedOpenHashMap(); + private Double2IntMap doubleDictionaryContent = new Double2IntOpenHashMap(); + private DoubleArrayList dictionaryValues = new DoubleArrayList(); public PlainDoubleDictionaryValuesWriter( int maxDictionaryByteSize, @@ -432,8 +419,9 @@ public PlainDoubleDictionaryValuesWriter( public void writeDouble(double v) { int id = doubleDictionaryContent.get(v); if (id == -1) { - id = doubleDictionaryContent.size(); + id = dictionaryValues.size(); doubleDictionaryContent.put(v, id); + dictionaryValues.add(v); dictionaryByteSize += 8; } encodedValues.add(id); @@ -445,10 +433,9 @@ public DictionaryPage toDictPageAndClose() { // return a dictionary only if we actually used it PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); - DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { - dictionaryEncoder.writeDouble(doubleIterator.nextDouble()); + dictionaryEncoder.writeDouble(dictionaryValues.getDouble(i)); } return dictPage(dictionaryEncoder); } @@ -463,24 +450,16 @@ public int getDictionarySize() { @Override protected void clearDictionaryContent() { doubleDictionaryContent.clear(); + dictionaryValues.clear(); } @Override public void fallBackDictionaryEncodedData(ValuesWriter writer) { - // build reverse dictionary - double[] reverseDictionary = new double[getDictionarySize()]; - ObjectIterator entryIterator = - doubleDictionaryContent.double2IntEntrySet().iterator(); - while (entryIterator.hasNext()) { - Double2IntMap.Entry entry = entryIterator.next(); - reverseDictionary[entry.getIntValue()] = entry.getDoubleKey(); - } - // fall back to plain encoding IntIterator iterator = encodedValues.iterator(); while (iterator.hasNext()) { int id = iterator.next(); - writer.writeDouble(reverseDictionary[id]); + writer.writeDouble(dictionaryValues.getDouble(id)); } } } @@ -488,7 +467,8 @@ public void fallBackDictionaryEncodedData(ValuesWriter writer) { public static class PlainIntegerDictionaryValuesWriter extends DictionaryValuesWriter { /* type specific dictionary content */ - private Int2IntMap intDictionaryContent = new Int2IntLinkedOpenHashMap(); + private Int2IntMap intDictionaryContent = new Int2IntOpenHashMap(); + private IntArrayList dictionaryValues = new IntArrayList(); public PlainIntegerDictionaryValuesWriter( int maxDictionaryByteSize, @@ -503,8 +483,9 @@ public PlainIntegerDictionaryValuesWriter( public void writeInteger(int v) { int id = intDictionaryContent.get(v); if (id == -1) { - id = intDictionaryContent.size(); + id = dictionaryValues.size(); intDictionaryContent.put(v, id); + dictionaryValues.add(v); dictionaryByteSize += 4; } encodedValues.add(id); @@ -516,11 +497,9 @@ public DictionaryPage toDictPageAndClose() { // return a dictionary only if we actually used it PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); - it.unimi.dsi.fastutil.ints.IntIterator intIterator = - intDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { - dictionaryEncoder.writeInteger(intIterator.nextInt()); + dictionaryEncoder.writeInteger(dictionaryValues.getInt(i)); } return dictPage(dictionaryEncoder); } @@ -535,24 +514,16 @@ public int getDictionarySize() { @Override protected void clearDictionaryContent() { intDictionaryContent.clear(); + dictionaryValues.clear(); } @Override public void fallBackDictionaryEncodedData(ValuesWriter writer) { - // build reverse dictionary - int[] reverseDictionary = new int[getDictionarySize()]; - ObjectIterator entryIterator = - intDictionaryContent.int2IntEntrySet().iterator(); - while (entryIterator.hasNext()) { - Int2IntMap.Entry entry = entryIterator.next(); - reverseDictionary[entry.getIntValue()] = entry.getIntKey(); - } - // fall back to plain encoding IntIterator iterator = encodedValues.iterator(); while (iterator.hasNext()) { int id = iterator.next(); - writer.writeInteger(reverseDictionary[id]); + writer.writeInteger(dictionaryValues.getInt(id)); } } } @@ -560,7 +531,8 @@ public void fallBackDictionaryEncodedData(ValuesWriter writer) { public static class PlainFloatDictionaryValuesWriter extends DictionaryValuesWriter { /* type specific dictionary content */ - private Float2IntMap floatDictionaryContent = new Float2IntLinkedOpenHashMap(); + private Float2IntMap floatDictionaryContent = new Float2IntOpenHashMap(); + private FloatArrayList dictionaryValues = new FloatArrayList(); public PlainFloatDictionaryValuesWriter( int maxDictionaryByteSize, @@ -575,8 +547,9 @@ public PlainFloatDictionaryValuesWriter( public void writeFloat(float v) { int id = floatDictionaryContent.get(v); if (id == -1) { - id = floatDictionaryContent.size(); + id = dictionaryValues.size(); floatDictionaryContent.put(v, id); + dictionaryValues.add(v); dictionaryByteSize += 4; } encodedValues.add(id); @@ -588,10 +561,9 @@ public DictionaryPage toDictPageAndClose() { // return a dictionary only if we actually used it PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); - FloatIterator floatIterator = floatDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { - dictionaryEncoder.writeFloat(floatIterator.nextFloat()); + dictionaryEncoder.writeFloat(dictionaryValues.getFloat(i)); } return dictPage(dictionaryEncoder); } @@ -606,24 +578,16 @@ public int getDictionarySize() { @Override protected void clearDictionaryContent() { floatDictionaryContent.clear(); + dictionaryValues.clear(); } @Override public void fallBackDictionaryEncodedData(ValuesWriter writer) { - // build reverse dictionary - float[] reverseDictionary = new float[getDictionarySize()]; - ObjectIterator entryIterator = - floatDictionaryContent.float2IntEntrySet().iterator(); - while (entryIterator.hasNext()) { - Float2IntMap.Entry entry = entryIterator.next(); - reverseDictionary[entry.getIntValue()] = entry.getFloatKey(); - } - // fall back to plain encoding IntIterator iterator = encodedValues.iterator(); while (iterator.hasNext()) { int id = iterator.next(); - writer.writeFloat(reverseDictionary[id]); + writer.writeFloat(dictionaryValues.getFloat(id)); } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java index a759ea038e..058795dc91 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java @@ -99,6 +99,7 @@ private void incrementPosition() { // not be added private int[] currentSlab; private int currentSlabPos; + private int totalSize; private void allocateSlab() { currentSlab = new int[currentSlabSize]; @@ -129,6 +130,7 @@ public void add(int i) { currentSlab[currentSlabPos] = i; ++currentSlabPos; + ++totalSize; } /** @@ -150,11 +152,6 @@ public IntIterator iterator() { * @return the current size of the list */ public int size() { - int size = currentSlabPos; - for (int[] slab : slabs) { - size += slab.length; - } - - return size; + return totalSize; } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java index 468c7d110f..1ee3dec8ad 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java @@ -24,15 +24,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.plain.BooleanPlainValuesReader; -import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader; -import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader; -import org.apache.parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader; -import org.apache.parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; @@ -147,12 +144,11 @@ public static class PlainLongDictionary extends PlainValuesDictionary { */ public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); + final ByteBuffer dictionaryBytes = + dictionaryPage.getBytes().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); longDictionaryContent = new long[dictionaryPage.getDictionarySize()]; - LongPlainValuesReader longReader = new LongPlainValuesReader(); - longReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < longDictionaryContent.length; i++) { - longDictionaryContent[i] = longReader.readLong(); + longDictionaryContent[i] = dictionaryBytes.getLong(); } } @@ -189,12 +185,11 @@ public static class PlainDoubleDictionary extends PlainValuesDictionary { */ public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); + final ByteBuffer dictionaryBytes = + dictionaryPage.getBytes().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()]; - DoublePlainValuesReader doubleReader = new DoublePlainValuesReader(); - doubleReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < doubleDictionaryContent.length; i++) { - doubleDictionaryContent[i] = doubleReader.readDouble(); + doubleDictionaryContent[i] = dictionaryBytes.getDouble(); } } @@ -231,12 +226,11 @@ public static class PlainIntegerDictionary extends PlainValuesDictionary { */ public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); + final ByteBuffer dictionaryBytes = + dictionaryPage.getBytes().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); intDictionaryContent = new int[dictionaryPage.getDictionarySize()]; - IntegerPlainValuesReader intReader = new IntegerPlainValuesReader(); - intReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < intDictionaryContent.length; i++) { - intDictionaryContent[i] = intReader.readInteger(); + intDictionaryContent[i] = dictionaryBytes.getInt(); } } @@ -273,12 +267,11 @@ public static class PlainFloatDictionary extends PlainValuesDictionary { */ public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); + final ByteBuffer dictionaryBytes = + dictionaryPage.getBytes().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); floatDictionaryContent = new float[dictionaryPage.getDictionarySize()]; - FloatPlainValuesReader floatReader = new FloatPlainValuesReader(); - floatReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < floatDictionaryContent.length; i++) { - floatDictionaryContent[i] = floatReader.readFloat(); + floatDictionaryContent[i] = dictionaryBytes.getFloat(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java index e37ee12483..5c7600a43f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java @@ -37,6 +37,18 @@ public abstract class Binary implements Comparable, Serializable { protected boolean isBackingBytesReused; + /** + * Cached hash code for non-reused (immutable) Binary instances. + *

The sentinel value {@code 0} means "not yet computed". This follows the + * {@link String#hashCode()} idiom: races between concurrent first calls are benign + * because the computation is deterministic, and a hash that genuinely equals {@code 0} + * will simply be recomputed on every call (acceptably rare). Reused instances never + * cache (their backing bytes can mutate after construction). + *

Package-private (rather than private) so subclasses can read it directly without + * an extra method call on the {@link #hashCode()} hot path. + */ + transient int cachedHashCode; + // this isn't really something others should extend private Binary() {} @@ -101,6 +113,18 @@ public boolean equals(Object obj) { return false; } + /** + * Caches {@code hashCode} for non-reused instances and returns it. The cache uses + * a single int field with sentinel {@code 0} to remain race-safe without volatile. + * If the computed hash is {@code 0}, no caching occurs and the next call recomputes. + */ + final int cacheHashCode(int hashCode) { + if (!isBackingBytesReused) { + cachedHashCode = hashCode; + } + return hashCode; + } + @Override public String toString() { return "Binary{" + length() @@ -180,7 +204,11 @@ public Binary slice(int start, int length) { @Override public int hashCode() { - return Binary.hashCode(value, offset, length); + int h = cachedHashCode; + if (h != 0) { + return h; + } + return cacheHashCode(Binary.hashCode(value, offset, length)); } @Override @@ -340,7 +368,11 @@ public Binary slice(int start, int length) { @Override public int hashCode() { - return Binary.hashCode(value, 0, value.length); + int h = cachedHashCode; + if (h != 0) { + return h; + } + return cacheHashCode(Binary.hashCode(value, 0, value.length)); } @Override @@ -499,11 +531,18 @@ public Binary slice(int start, int length) { @Override public int hashCode() { + int h = cachedHashCode; + if (h != 0) { + return h; + } + + int computedHashCode; if (value.hasArray()) { - return Binary.hashCode(value.array(), value.arrayOffset() + offset, length); + computedHashCode = Binary.hashCode(value.array(), value.arrayOffset() + offset, length); } else { - return Binary.hashCode(value, offset, length); + computedHashCode = Binary.hashCode(value, offset, length); } + return cacheHashCode(computedHashCode); } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/IntListTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/IntListTest.java index 6b5182ab73..74d6375723 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/IntListTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/IntListTest.java @@ -53,8 +53,15 @@ public void testListGreaterThanMaxSlabSize() { private void doTestIntList(int testSize, int expectedSlabSize) { IntList testList = new IntList(); + + // size() must be 0 before any adds + Assert.assertEquals(0, testList.size()); + populateList(testList, testSize); + // size() must match the number of elements added + Assert.assertEquals(testSize, testList.size()); + verifyIteratorResults(testSize, testList); // confirm the size of the current slab diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index a91f807e73..2b1cd574f5 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -43,6 +43,7 @@ import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter; import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter; import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter; import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter; import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter; @@ -659,6 +660,73 @@ public void testFloatDictionaryFallBack() throws IOException { } } + private FallbackValuesWriter + newPlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int fixedLength, int initialSize) { + return plainFallBack( + new PlainFixedLenArrayDictionaryValuesWriter( + maxDictionaryByteSize, fixedLength, PLAIN_DICTIONARY, PLAIN_DICTIONARY, allocator), + initialSize); + } + + @Test + public void testFixedLenByteArrayDictionary() throws IOException { + int COUNT = 100; + int FIXED_LENGTH = 4; + try (final FallbackValuesWriter cw = + newPlainFixedLenArrayDictionaryValuesWriter(10000, FIXED_LENGTH, 10000)) { + // Write repeated fixed-length values + for (int i = 0; i < COUNT; i++) { + byte[] bytes = new byte[FIXED_LENGTH]; + bytes[0] = (byte) (i % 10); + cw.writeBytes(Binary.fromConstantByteArray(bytes)); + } + BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY); + assertEquals(10, cw.initialWriter.getDictionarySize()); + + // Read back via dictionary + DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, FIXED_LENGTH); + cr.initFromPage(COUNT, bytes1.toInputStream()); + for (int i = 0; i < COUNT; i++) { + Binary back = cr.readBytes(); + assertEquals(FIXED_LENGTH, back.length()); + assertEquals((byte) (i % 10), back.getBytesUnsafe()[0]); + } + } + } + + @Test + public void testFixedLenByteArrayDictionaryFallBack() throws IOException { + int FIXED_LENGTH = 4; + int maxDictionaryByteSize = 20; // fits at most 5 distinct 4-byte values + try (final FallbackValuesWriter cw = + newPlainFixedLenArrayDictionaryValuesWriter(maxDictionaryByteSize, FIXED_LENGTH, 100)) { + int fallBackThreshold = maxDictionaryByteSize / FIXED_LENGTH; + for (int i = 0; i < 100; i++) { + byte[] bytes = new byte[FIXED_LENGTH]; + bytes[0] = (byte) i; // all distinct + cw.writeBytes(Binary.fromConstantByteArray(bytes)); + if (i < fallBackThreshold) { + assertEquals(PLAIN_DICTIONARY, cw.getEncoding()); + } else { + assertEquals(PLAIN, cw.getEncoding()); + } + } + + // Fell back to plain, read with BinaryPlainValuesReader + ValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(100, cw.getBytes().toInputStream()); + for (int i = 0; i < 100; i++) { + Binary back = reader.readBytes(); + assertEquals(FIXED_LENGTH, back.length()); + assertEquals((byte) i, back.getBytesUnsafe()[0]); + } + + // simulate cutting the page + cw.reset(); + assertEquals(0, cw.getBufferedSize()); + } + } + @Test public void testZeroValues() throws IOException { try (FallbackValuesWriter cw = @@ -748,8 +816,13 @@ public void testBooleanDictionaryWithDictionaryEncoding() throws IOException { } private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type) throws IOException { + return initDicReader(cw, type, 0); + } + + private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type, int typeLength) + throws IOException { final DictionaryPage dictionaryPage = cw.toDictPageAndClose().copy(); - final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, 0, 0); + final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, typeLength, 0, 0); final Dictionary dictionary = PLAIN.initDictionary(descriptor, dictionaryPage); final DictionaryValuesReader cr = new DictionaryValuesReader(dictionary); return cr; diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java index a1a83af771..19085b2244 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java @@ -155,6 +155,51 @@ public void testEqualityMethods() throws Exception { assertEquals(bin1, bin2); } + /** + * Verifies that {@link Binary#hashCode()} is cached for non-reused (constant) instances: + * the value returned must be stable, equal across the three concrete Binary + * implementations for the same bytes, and consistent with {@link Object#equals(Object)}. + */ + @Test + public void testHashCodeCachedForConstantBinary() { + byte[] bytes = "hash-cache-test".getBytes(); + + Binary[] constants = { + Binary.fromConstantByteArray(bytes), + Binary.fromConstantByteArray(bytes, 0, bytes.length), + Binary.fromConstantByteBuffer(ByteBuffer.wrap(bytes)), + }; + int reference = constants[0].hashCode(); + for (Binary b : constants) { + int first = b.hashCode(); + int second = b.hashCode(); + assertEquals("repeated hashCode for " + b.getClass().getSimpleName(), first, second); + assertEquals( + "cross-impl hashCode for " + b.getClass().getSimpleName() + " must equal reference", + reference, + first); + } + } + + /** + * Verifies that reused (mutable backing) Binary instances do not return a stale cached + * hash code when their backing bytes change between calls. + */ + @Test + public void testHashCodeNotCachedForReusedBinary() { + byte[] bytes = "first".getBytes(); + Binary reused = Binary.fromReusedByteArray(bytes); + int firstHash = reused.hashCode(); + int constHashFirst = Binary.fromConstantByteArray(bytes).hashCode(); + assertEquals(constHashFirst, firstHash); + + byte[] mutated = "second-value".getBytes(); + reused = Binary.fromReusedByteArray(mutated); + int secondHash = reused.hashCode(); + int constHashSecond = Binary.fromConstantByteArray(mutated).hashCode(); + assertEquals(constHashSecond, secondHash); + } + @Test public void testWriteAllTo() throws Exception { byte[] orig = {10, 9, 8, 7, 6, 5, 4, 3, 2, 1};