Skip to content

Commit da5ffce

Browse files
authored
Fix for vectorized builder variant handling (#16087)
* Fix for vectorized builder variant handling * Simplify test query and add reg test * PR comment: add describedAs for keys * Add merge into test for spark 4.0 * PR comment: Add test for variant not in projection
1 parent 0011a85 commit da5ffce

4 files changed

Lines changed: 200 additions & 0 deletions

File tree

arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ public VectorizedReader<?> struct(
154154
return null;
155155
}
156156

157+
@Override
158+
public VectorizedReader<?> variant(
159+
Types.VariantType iVariant, GroupType variant, VectorizedReader<?> result) {
160+
if (iVariant != null) {
161+
throw new UnsupportedOperationException(
162+
"Vectorized reads are not supported yet for variant fields");
163+
}
164+
return null;
165+
}
166+
157167
@Override
158168
public VectorizedReader<?> primitive(
159169
org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) {
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.arrow.vectorized;
20+
21+
import static org.assertj.core.api.Assertions.assertThatNoException;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
24+
import org.apache.iceberg.Schema;
25+
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
26+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
27+
import org.apache.iceberg.types.Types.IntegerType;
28+
import org.apache.iceberg.types.Types.NestedField;
29+
import org.apache.iceberg.types.Types.VariantType;
30+
import org.apache.iceberg.variants.Variant;
31+
import org.apache.parquet.schema.LogicalTypeAnnotation;
32+
import org.apache.parquet.schema.MessageType;
33+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
34+
import org.apache.parquet.schema.Type;
35+
import org.apache.parquet.schema.Types;
36+
import org.junit.jupiter.api.Test;
37+
38+
public class TestVectorizedReaderBuilder {
39+
40+
@Test
41+
public void testVariantNotSupportedInVectorizedReads() {
42+
Schema icebergSchema =
43+
new Schema(
44+
NestedField.required(1, "id", IntegerType.get()),
45+
NestedField.optional(2, "data", VariantType.get()));
46+
47+
MessageType parquetSchema = parquetSchemaWithVariant();
48+
49+
VectorizedReaderBuilder builder =
50+
new VectorizedReaderBuilder(
51+
icebergSchema, parquetSchema, false, ImmutableMap.of(), readers -> null);
52+
53+
assertThatThrownBy(
54+
() -> TypeWithSchemaVisitor.visit(icebergSchema.asStruct(), parquetSchema, builder))
55+
.isInstanceOf(UnsupportedOperationException.class)
56+
.hasMessageContaining("Vectorized reads are not supported yet for variant fields");
57+
}
58+
59+
@Test
60+
public void testVariantSkippedWhenNotInProjection() {
61+
Schema icebergSchema = new Schema(NestedField.required(1, "id", IntegerType.get()));
62+
63+
MessageType parquetSchema = parquetSchemaWithVariant();
64+
65+
VectorizedReaderBuilder builder =
66+
new VectorizedReaderBuilder(
67+
icebergSchema, parquetSchema, false, ImmutableMap.of(), readers -> null);
68+
69+
assertThatNoException()
70+
.describedAs("Variant not in projection should not throw")
71+
.isThrownBy(
72+
() -> TypeWithSchemaVisitor.visit(icebergSchema.asStruct(), parquetSchema, builder));
73+
}
74+
75+
private static MessageType parquetSchemaWithVariant() {
76+
return Types.buildMessage()
77+
.addField(
78+
Types.primitive(PrimitiveTypeName.INT32, Type.Repetition.REQUIRED).id(1).named("id"))
79+
.addField(
80+
Types.buildGroup(Type.Repetition.OPTIONAL)
81+
.as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
82+
.addField(
83+
Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
84+
.named("metadata"))
85+
.addField(
86+
Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
87+
.named("value"))
88+
.id(2)
89+
.named("data"))
90+
.named("table");
91+
}
92+
}

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,55 @@ public void testNestedMapVariant(boolean vectorized) {
302302
sql("DROP TABLE IF EXISTS %s", mapTable);
303303
}
304304

305+
@ParameterizedTest
306+
@ValueSource(booleans = {false, true})
307+
public void testMergeIntoWithVariant(boolean vectorized) {
308+
// Variant columns are not vectorized yet, but MERGE INTO should not crash regardless of the
309+
// vectorization setting. The reader falls back to non-vectorized for variant columns.
310+
String mergeTable = CATALOG + ".default.var_merge";
311+
sql("DROP TABLE IF EXISTS %s", mergeTable);
312+
sql(
313+
"CREATE TABLE %s (id BIGINT, data VARIANT) USING iceberg "
314+
+ "TBLPROPERTIES ('format-version'='3')",
315+
mergeTable);
316+
setVectorization(mergeTable, vectorized);
317+
318+
sql(
319+
"INSERT INTO %s VALUES "
320+
+ "(1, parse_json('{\"name\":\"alice\",\"age\":30}')), "
321+
+ "(2, parse_json('{\"name\":\"bob\",\"age\":25}'))",
322+
mergeTable);
323+
324+
sql(
325+
"MERGE INTO %s AS target "
326+
+ "USING (SELECT 1 AS id, parse_json('{\"name\":\"alice\",\"age\":31}') AS data) AS source "
327+
+ "ON target.id = source.id "
328+
+ "WHEN MATCHED THEN UPDATE SET target.data = source.data "
329+
+ "WHEN NOT MATCHED THEN INSERT *",
330+
mergeTable);
331+
332+
List<Row> rows = spark.table(mergeTable).select("id", "data").orderBy("id").collectAsList();
333+
334+
assertThat(rows).hasSize(2);
335+
assertThat(rows.get(0).getLong(0)).isEqualTo(1L);
336+
Variant v1 =
337+
new Variant(
338+
((VariantVal) rows.get(0).get(1)).getValue(),
339+
((VariantVal) rows.get(0).get(1)).getMetadata());
340+
assertThat(v1.getFieldByKey("name").getString()).describedAs("v1.name").isEqualTo("alice");
341+
assertThat(v1.getFieldByKey("age").getLong()).describedAs("v1.age").isEqualTo(31L);
342+
343+
assertThat(rows.get(1).getLong(0)).isEqualTo(2L);
344+
Variant v2 =
345+
new Variant(
346+
((VariantVal) rows.get(1).get(1)).getValue(),
347+
((VariantVal) rows.get(1).get(1)).getMetadata());
348+
assertThat(v2.getFieldByKey("name").getString()).describedAs("v2.name").isEqualTo("bob");
349+
assertThat(v2.getFieldByKey("age").getLong()).describedAs("v2.age").isEqualTo(25L);
350+
351+
sql("DROP TABLE IF EXISTS %s", mergeTable);
352+
}
353+
305354
private void setVectorization(boolean on) {
306355
sql(
307356
"ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')",

spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,55 @@ public void testNestedMapVariant(boolean vectorized) {
302302
sql("DROP TABLE IF EXISTS %s", mapTable);
303303
}
304304

305+
@ParameterizedTest
306+
@ValueSource(booleans = {false, true})
307+
public void testMergeIntoWithVariant(boolean vectorized) {
308+
// Variant columns are not vectorized yet, but MERGE INTO should not crash regardless of the
309+
// vectorization setting. The reader falls back to non-vectorized for variant columns.
310+
String mergeTable = CATALOG + ".default.var_merge";
311+
sql("DROP TABLE IF EXISTS %s", mergeTable);
312+
sql(
313+
"CREATE TABLE %s (id BIGINT, data VARIANT) USING iceberg "
314+
+ "TBLPROPERTIES ('format-version'='3')",
315+
mergeTable);
316+
setVectorization(mergeTable, vectorized);
317+
318+
sql(
319+
"INSERT INTO %s VALUES "
320+
+ "(1, parse_json('{\"name\":\"alice\",\"age\":30}')), "
321+
+ "(2, parse_json('{\"name\":\"bob\",\"age\":25}'))",
322+
mergeTable);
323+
324+
sql(
325+
"MERGE INTO %s AS target "
326+
+ "USING (SELECT 1 AS id, parse_json('{\"name\":\"alice\",\"age\":31}') AS data) AS source "
327+
+ "ON target.id = source.id "
328+
+ "WHEN MATCHED THEN UPDATE SET target.data = source.data "
329+
+ "WHEN NOT MATCHED THEN INSERT *",
330+
mergeTable);
331+
332+
List<Row> rows = spark.table(mergeTable).select("id", "data").orderBy("id").collectAsList();
333+
334+
assertThat(rows).hasSize(2);
335+
assertThat(rows.get(0).getLong(0)).isEqualTo(1L);
336+
Variant v1 =
337+
new Variant(
338+
((VariantVal) rows.get(0).get(1)).getValue(),
339+
((VariantVal) rows.get(0).get(1)).getMetadata());
340+
assertThat(v1.getFieldByKey("name").getString()).describedAs("v1.name").isEqualTo("alice");
341+
assertThat(v1.getFieldByKey("age").getLong()).describedAs("v1.age").isEqualTo(31L);
342+
343+
assertThat(rows.get(1).getLong(0)).isEqualTo(2L);
344+
Variant v2 =
345+
new Variant(
346+
((VariantVal) rows.get(1).get(1)).getValue(),
347+
((VariantVal) rows.get(1).get(1)).getMetadata());
348+
assertThat(v2.getFieldByKey("name").getString()).describedAs("v2.name").isEqualTo("bob");
349+
assertThat(v2.getFieldByKey("age").getLong()).describedAs("v2.age").isEqualTo(25L);
350+
351+
sql("DROP TABLE IF EXISTS %s", mergeTable);
352+
}
353+
305354
private void setVectorization(boolean on) {
306355
sql(
307356
"ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')",

0 commit comments

Comments
 (0)