Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions parquet-variant/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public final class Variant {
* Lazy cache for the parsed array header.
*/
private VariantUtil.ArrayInfo cachedArrayInfo;
/** Nesting depth of this Variant relative to the top-level value (0 = top-level). */
private final int depth;

/**
* The threshold to switch from linear search to binary search when looking up a field by key in
Expand All @@ -84,7 +86,7 @@ public Variant(byte[] value, int valuePos, int valueLength, byte[] metadata, int
public Variant(ByteBuffer value, ByteBuffer metadata) {
this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);

this.depth = 0;
// There is currently only one allowed version.
if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) {
throw new UnsupportedOperationException(String.format(
Expand All @@ -108,16 +110,23 @@ public Variant(ByteBuffer value, ByteBuffer metadata) {
this.dictSize = 0;
}
this.metadataCache = null;
VariantUtil.validateValueShallow(this.value, dictSize);
}

/**
* Package-private constructor that shares pre-parsed metadata state from a parent Variant.
*/
Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) {
Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize, int depth) {
Preconditions.checkArgument(
depth <= VariantUtil.MAX_VARIANT_DEPTH,
"variant nesting depth exceeds maximum %s",
VariantUtil.MAX_VARIANT_DEPTH);
this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
this.metadataCache = metadataCache;
this.dictSize = dictSize;
this.depth = depth;
VariantUtil.validateValueShallow(this.value, dictSize);
}

public ByteBuffer getValueBuffer() {
Expand Down Expand Up @@ -361,7 +370,7 @@ public Variant getElementAtIndex(int index) {
* Creates a child Variant that shares this instance's metadata cache.
*/
private Variant childVariant(ByteBuffer childValue) {
return new Variant(childValue, metadata, metadataCache, dictSize);
return new Variant(childValue, metadata, metadataCache, dictSize, depth + 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.parquet.variant;

import static org.apache.parquet.variant.VariantUtil.MAX_VARIANT_DEPTH;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -37,7 +39,7 @@ public final class VariantJsonParser {

private static final JsonFactory JSON_FACTORY = JsonFactory.builder()
.streamReadConstraints(StreamReadConstraints.builder()
.maxNestingDepth(500)
.maxNestingDepth(MAX_VARIANT_DEPTH)
.maxStringLength(10_000_000)
.maxDocumentLength(50_000_000L)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class VariantUtil {
// The size (in bytes) of a UUID.
static final int UUID_SIZE = 16;

/**
* Maximum permitted nesting depth of a Variant value.
* same limit as in VariantJsonParser.
*/
static final int MAX_VARIANT_DEPTH = 500;

// header bytes
static final byte HEADER_NULL = primitiveHeader(NULL);
static final byte HEADER_LONG_STRING = primitiveHeader(LONG_STR);
Expand Down Expand Up @@ -874,6 +880,160 @@ static HashMap<String, Integer> getMetadataMap(ByteBuffer metadata) {
return result;
}

/**
* Bounds-checks the metadata buffer: header version, dictionary offset table and string data
* region all fit within the buffer extent. It does not perform any deep checks into
* the metadata itself.
*
* @param metadata the variant metadata buffer
* @return the dictionary size
* @throws IllegalArgumentException if the metadata buffer is not well-formed
*/
static int validateMetadata(ByteBuffer metadata) {
int pos = metadata.position();
Preconditions.checkArgument(pos >= 0 && pos < metadata.limit(), "variant metadata is empty");
int header = metadata.get(pos) & 0xFF;
Preconditions.checkArgument(
(header & VERSION_MASK) == VERSION, "Unsupported variant metadata version: %s", header & VERSION_MASK);
int offsetSize = ((header >> 6) & 0x3) + 1;
long remaining = (long) metadata.limit() - pos;
long offsetListStart = 1L + offsetSize;
Preconditions.checkArgument(offsetListStart <= remaining, "variant metadata truncated");
int dictSize = readUnsigned(metadata, pos + 1, offsetSize);
long offsetBytes = (long) (dictSize + 1) * offsetSize;
long dataStart = offsetListStart + offsetBytes;
Preconditions.checkArgument(
dataStart <= remaining, "variant metadata dictionary table extends past buffer: dictSize=%s", dictSize);
return dictSize;
}

/**
* Bounds-checks a single Variant value node against its buffer slot. Performs no recursion
* into nested children: child nodes are checked on demand when callers descend into them.
*
* <p>Cost: O(1) for primitives and short strings, O(numElements) for objects and arrays.
* Validation of nested structures is deferred so that opening a large well-formed Variant
* is not penalised by sub-trees the caller never inspects.
*
* @param value the variant value buffer (position/limit define the extent of this node's slot)
* @param dictSize the metadata dictionary size, used to bound object field ids
* @throws IllegalArgumentException if the value header or container table does not fit within
* the buffer slot, or if any object field id is out of range
*/
static void validateValueShallow(ByteBuffer value, int dictSize) {
int s = value.position();
Preconditions.checkArgument(s >= 0 && s < value.limit(), "variant value is empty");
long slot = (long) value.limit() - s;
int header = value.get(s) & 0xFF;
int basicType = header & BASIC_TYPE_MASK;
int typeInfo = (header >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK;
switch (basicType) {
case SHORT_STR:
Preconditions.checkArgument(1L + typeInfo <= slot, "variant short string extends past buffer");
return;
case OBJECT:
validateContainerShallow(value, s, slot, dictSize, true, typeInfo);
return;
case ARRAY:
validateContainerShallow(value, s, slot, dictSize, false, typeInfo);
return;
default:
validatePrimitiveShallow(value, s, slot, typeInfo);
}
}

private static void validateContainerShallow(
ByteBuffer value, int s, long slot, int dictSize, boolean isObject, int typeInfo) {
boolean largeSize;
int idSize;
if (isObject) {
largeSize = ((typeInfo >> 4) & 0x1) != 0;
idSize = ((typeInfo >> 2) & 0x3) + 1;
} else {
largeSize = ((typeInfo >> 2) & 0x1) != 0;
idSize = 0;
}
int offsetSize = (typeInfo & 0x3) + 1;
int sizeBytes = largeSize ? U32_SIZE : 1;
Preconditions.checkArgument(1L + sizeBytes <= slot, "variant container header truncated");
int numElements = readUnsigned(value, s + 1, sizeBytes);
long idStart = 1L + sizeBytes;
long idBytes = isObject ? (long) numElements * idSize : 0L;
long offsetStart = idStart + idBytes;
long offsetBytes = (long) (numElements + 1) * offsetSize;
long dataStart = offsetStart + offsetBytes;
Preconditions.checkArgument(
dataStart <= slot, "variant container offset table extends past buffer: numElements=%s", numElements);
long dataLen = slot - dataStart;
if (isObject) {
for (int i = 0; i < numElements; i++) {
int id = readUnsigned(value, s + (int) idStart + i * idSize, idSize);
Preconditions.checkArgument(
id < dictSize, "variant object key id %s out of range (dictSize=%s)", id, dictSize);
}
}
// Each child offset must lie within the data region. Children may overlap or leave gaps;
// the trailing terminator offset is range-checked for the same reason.
for (int i = 0; i <= numElements; i++) {
// O(elements)
int off = readUnsigned(value, s + (int) offsetStart + i * offsetSize, offsetSize);
Preconditions.checkArgument(
off <= dataLen, "variant child offset out of range: %s (data length %s)", off, dataLen);
}
}

private static void validatePrimitiveShallow(ByteBuffer value, int s, long slot, int typeInfo) {
long size;
switch (typeInfo) {
case NULL:
case TRUE:
case FALSE:
size = 1;
break;
case INT8:
size = 2;
break;
case INT16:
size = 3;
break;
case INT32:
case DATE:
case FLOAT:
size = 5;
break;
case INT64:
case DOUBLE:
case TIMESTAMP_TZ:
case TIMESTAMP_NTZ:
case TIME:
case TIMESTAMP_NANOS_TZ:
case TIMESTAMP_NANOS_NTZ:
size = 9;
break;
case DECIMAL4:
size = 6;
break;
case DECIMAL8:
size = 10;
break;
case DECIMAL16:
size = 18;
break;
case BINARY:
case LONG_STR: {
Preconditions.checkArgument(1L + U32_SIZE <= slot, "variant string/binary length field truncated");
size = 1L + U32_SIZE + readUnsigned(value, s + 1, U32_SIZE);
break;
}
case UUID:
size = 1L + UUID_SIZE;
break;
default:
throw new IllegalArgumentException(String.format("Unknown primitive type in variant: %d", typeInfo));
}
Preconditions.checkArgument(size <= slot, "variant value extends past buffer");
}

/**
* Computes the actual size (in bytes) of the Variant value.
* @param value The Variant value binary
Expand Down
Loading