From fc48872b9adb721354c231f65c2e2c6c5ae105ac Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 4 Mar 2026 23:20:11 +0800 Subject: [PATCH 01/14] Flink:Support writing shredded variant --- .../apache/iceberg/flink/FlinkWriteConf.java | 18 + .../iceberg/flink/FlinkWriteOptions.java | 6 + .../iceberg/flink/data/FlinkFormatModels.java | 5 +- .../data/FlinkVariantShreddingAnalyzer.java | 65 ++ .../apache/iceberg/flink/sink/SinkUtil.java | 6 + .../flink/TestFlinkVariantShreddingType.java | 844 ++++++++++++++++++ .../iceberg/parquet/ParquetFormatModel.java | 14 +- .../parquet/TestParquetDataWriter.java | 4 +- .../spark/source/SparkFormatModels.java | 2 +- 9 files changed, 953 insertions(+), 11 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java create mode 100644 flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 990d23f2aaff..bc648923f1bd 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -262,4 +262,22 @@ public Duration tableRefreshInterval() { .flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL) .parseOptional(); } + + public boolean shredVariants() { + return confParser + .booleanConf() + .option(FlinkWriteOptions.SHRED_VARIANTS.key()) + .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) + .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) + .parse(); + } + + public int variantInferenceBufferSize() { + return confParser + .intConf() + .option(FlinkWriteOptions.VARIANT_INFERENCE_BUFFER_SIZE.key()) + .tableProperty(TableProperties.PARQUET_VARIANT_BUFFER_SIZE) + .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT) + .parse(); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index ee2aeaa45007..3d0866213f59 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -105,4 +105,10 @@ private FlinkWriteOptions() {} // specify the uidSuffix to be used for the underlying IcebergSink public static final ConfigOption UID_SUFFIX = ConfigOptions.key("uid-suffix").stringType().defaultValue(""); + + public static final ConfigOption SHRED_VARIANTS = + ConfigOptions.key("shred-variants").booleanType().defaultValue(false); + + public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = + ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(10); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java index dd713b0dce2a..c044dd15014d 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.data; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.avro.AvroFormatModel; import org.apache.iceberg.formats.FormatModelRegistry; @@ -33,7 +34,9 @@ public static void register() { RowType.class, FlinkParquetWriters::buildWriter, (icebergSchema, fileSchema, engineSchema, idToConstant) -> - FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), + new FlinkVariantShreddingAnalyzer(), + (row, rowType) -> new RowDataSerializer(rowType).copy(row))); FormatModelRegistry.register( AvroFormatModel.create( diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java new file mode 100644 index 000000000000..4e39cad89f9d --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.variant.BinaryVariant; +import org.apache.iceberg.parquet.VariantShreddingAnalyzer; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; + +public class FlinkVariantShreddingAnalyzer extends VariantShreddingAnalyzer { + + @Override + protected List extractVariantValues( + List bufferedRows, int variantFieldIndex) { + List values = Lists.newArrayList(); + + for (RowData row : bufferedRows) { + if (!row.isNullAt(variantFieldIndex)) { + BinaryVariant flinkVariant = (BinaryVariant) row.getVariant(variantFieldIndex); + if (flinkVariant != null) { + VariantValue variantValue = + VariantValue.from( + VariantMetadata.from( + ByteBuffer.wrap(flinkVariant.getMetadata()).order(ByteOrder.LITTLE_ENDIAN)), + ByteBuffer.wrap(flinkVariant.getValue()).order(ByteOrder.LITTLE_ENDIAN)); + + values.add(variantValue); + } + } + } + + return values; + } + + @Override + protected int resolveColumnIndex(RowType flinkSchema, String columnName) { + try { + return flinkSchema.getFieldIndex(columnName); + } catch (IllegalArgumentException e) { + return -1; + } + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index b3a9ac6ba2eb..6cb9a25bbedf 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -24,6 +24,8 @@ import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS; +import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE; import java.util.List; import java.util.Map; @@ -128,6 +130,10 @@ public static Map writeProperties( writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } + writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.shredVariants())); + writeProperties.put( + PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.variantInferenceBufferSize())); + break; case AVRO: writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java new file mode 100644 index 000000000000..6d8dc572bf03 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -0,0 +1,844 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import static org.apache.parquet.schema.Types.optional; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.reader.ReaderUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.variants.Variant; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; + +class TestFlinkVariantShreddingType extends CatalogTestBase { + + private static final String TABLE_NAME = "test_table"; + private Table icebergTable; + + @Parameters(name = "catalogName={0}, baseNamespace={1}") + protected static List parameters() { + List parameters = Lists.newArrayList(); + parameters.add(new Object[] {"testhadoop", Namespace.empty()}); + parameters.add(new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")}); + return parameters; + } + + @Override + @BeforeEach + public void before() { + super.before(); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + sql( + "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", + TABLE_NAME, FileFormat.PARQUET.name()); + icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); + } + + @TestTemplate + public void testExcludingNullValue() throws IOException { + String values = + "(1, parse_json('{\"name\": \"Alice\", \"age\": 30, \"dummy\": null}'))," + + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," + + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType name = + field( + "name", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType age = + field( + "age", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(age, name)); + MessageType expectedSchema = parquetSchema(address); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testConsistentType() throws IOException { + String values = + "(1, parse_json('{\"age\": \"25\"}'))," + + " (2, parse_json('{\"age\": 30}'))," + + " (3, parse_json('{\"age\": \"35\"}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType age = + field( + "age", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(age)); + MessageType expectedSchema = parquetSchema(address); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testPrimitiveType() throws IOException { + String values = "(1, parse_json('123')), (2, parse_json('\"abc\"')), (3, parse_json('12'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType address = + variant( + "address", + 2, + Type.Repetition.REQUIRED, + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + MessageType expectedSchema = parquetSchema(address); + + assertThat(SimpleDataUtil.tableRecords(icebergTable)).hasSize(3); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testPrimitiveDecimalType() throws IOException { + String values = + "(1, parse_json('123.56')), (2, parse_json('\"abc\"')), (3, parse_json('12.56'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType address = + variant( + "address", + 2, + Type.Repetition.REQUIRED, + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.decimalType(2, 5))); + MessageType expectedSchema = parquetSchema(address); + assertThat(SimpleDataUtil.tableRecords(icebergTable)).hasSize(3); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testBooleanType() throws IOException { + String values = + "(1, parse_json('{\"active\": true}'))," + + " (2, parse_json('{\"active\": false}'))," + + " (3, parse_json('{\"active\": true}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType active = field("active", shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BOOLEAN)); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(active)); + MessageType expectedSchema = parquetSchema(address); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testDecimalTypeWithInconsistentScales() throws IOException { + String values = + "(1, parse_json('{\"price\": 123.456789}'))," + + " (2, parse_json('{\"price\": 678.90}'))," + + " (3, parse_json('{\"price\": 999.99}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType price = + field( + "price", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.decimalType(6, 9))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(price)); + MessageType expectedSchema = parquetSchema(address); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testDecimalTypeWithConsistentScales() throws IOException { + String values = + "(1, parse_json('{\"price\": 123.45}'))," + + " (2, parse_json('{\"price\": 678.90}'))," + + " (3, parse_json('{\"price\": 999.99}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType price = + field( + "price", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.decimalType(2, 5))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(price)); + MessageType expectedSchema = parquetSchema(address); + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testArrayType() throws IOException { + String values = + "(1, parse_json('[\"java\", \"scala\", \"python\"]'))," + + " (2, parse_json('[\"rust\", \"go\"]'))," + + " (3, parse_json('[\"javascript\"]'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType arr = + list( + element( + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType()))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, arr); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testNestedArrayType() throws IOException { + + String values = + "(1, parse_json('{\"tags\": [\"java\", \"scala\", \"python\"]}'))," + + " (2, parse_json('{\"tags\": [\"rust\", \"go\"]}'))," + + " (3, parse_json('{\"tags\": [\"javascript\"]}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType tags = + field( + "tags", + list( + element( + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.stringType())))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(tags)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testNestedObjectType() throws IOException { + + String values = + "(1, parse_json('{\"location\": {\"city\": \"Seattle\", \"zip\": 98101}, \"tags\": [\"java\", \"scala\", \"python\"]}'))," + + " (2, parse_json('{\"location\": {\"city\": \"Portland\", \"zip\": 97201}}'))," + + " (3, parse_json('{\"location\": {\"city\": \"NYC\", \"zip\": 10001}}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType city = + field( + "city", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType zip = + field( + "zip", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(32, true))); + GroupType location = field("location", objectFields(city, zip)); + GroupType tags = + field( + "tags", + list( + element( + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.stringType())))); + + GroupType address = + variant("address", 2, Type.Repetition.REQUIRED, objectFields(location, tags)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testLazyInitializationWithBufferedRows() throws IOException { + + String values = + "(1, parse_json('{\"name\": \"Alice\", \"age\": 30}'))," + + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," + + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))," + + " (4, parse_json('{\"name\": \"David\", \"age\": 28}'))," + + " (5, parse_json('{\"name\": \"Eve\", \"age\": 32}'))," + + " (6, parse_json('{\"name\": \"Frank\", \"age\": 40}'))," + + " (7, parse_json('{\"name\": \"Grace\", \"age\": 27}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType name = + field( + "name", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType age = + field( + "age", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(age, name)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + assertThat(genericRowData()).hasSize(7); + } + + @TestTemplate + public void testColumnIndexTruncateLength() throws IOException { + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + + int customTruncateLength = 10; + sql( + "ALTER TABLE %s SET ('%s'='%d')", + TABLE_NAME, "parquet.columnindex.truncate.length", customTruncateLength); + + StringBuilder valuesBuilder = new StringBuilder(); + for (int i = 1; i <= 10; i++) { + if (i > 1) { + valuesBuilder.append(", "); + } + + String longValue = "A".repeat(20); + valuesBuilder.append( + String.format( + "(%d, parse_json('{\"description\": \"%s\", \"id\": %d}'))", i, longValue, i)); + } + sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString()); + + GroupType description = + field( + "description", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType id = + field( + "id", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + GroupType address = + variant("address", 2, Type.Repetition.REQUIRED, objectFields(description, id)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + assertThat(genericRowData()).hasSize(10); + } + + @TestTemplate + public void testIntegerFamilyPromotion() throws IOException { + + // Mix of INT8, INT16, INT32, INT64 - should promote to INT64 + String values = + "(1, parse_json('{\"value\": 10}'))," + + " (2, parse_json('{\"value\": 1000}'))," + + " (3, parse_json('{\"value\": 100000}'))," + + " (4, parse_json('{\"value\": 10000000000}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType value = + field( + "value", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, LogicalTypeAnnotation.intType(64, true))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(value)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testDecimalFamilyPromotion() throws IOException { + + // Test that they get promoted to the most capable decimal type observed + String values = + "(1, parse_json('{\"value\": 1.5}'))," + + " (2, parse_json('{\"value\": 123.456789}'))," + + " (3, parse_json('{\"value\": 123456789123456.789}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType value = + field( + "value", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + 16, + LogicalTypeAnnotation.decimalType(6, 21))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(value)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testDataRoundTripWithShredding() throws IOException { + String values = + "(1, parse_json('{\"name\": \"Alice\", \"age\": 30}'))," + + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," + + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType name = + field( + "name", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType age = + field( + "age", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(age, name)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + + // Verify that we can read the data back correctly + List rows = + sql( + "SELECT id, JSON_VALUE(address, '$.name')," + + " JSON_VALUE(address, '$.age' RETURNING int)" + + " FROM %s ORDER BY id", + TABLE_NAME); + assertThat(rows).hasSize(3); + assertThat(rows.get(0).getField(0)).isEqualTo(1); + assertThat(rows.get(0).getField(1)).isEqualTo("Alice"); + assertThat(rows.get(0).getField(2)).isEqualTo(30); + assertThat(rows.get(1).getField(0)).isEqualTo(2); + assertThat(rows.get(1).getField(1)).isEqualTo("Bob"); + assertThat(rows.get(1).getField(2)).isEqualTo(25); + assertThat(rows.get(2).getField(0)).isEqualTo(3); + assertThat(rows.get(2).getField(1)).isEqualTo("Charlie"); + assertThat(rows.get(2).getField(2)).isEqualTo(35); + } + + @TestTemplate + public void testVariantWithNullValues() throws IOException { + + String values = + "(1, parse_json('null'))," + " (2, parse_json('null'))," + " (3, parse_json('null'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + GroupType address = variant("address", 2, Type.Repetition.REQUIRED); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testArrayOfNullElementsWithShredding() throws IOException { + + sql( + "INSERT INTO %s VALUES (1, parse_json('[null, null, null]')), " + + "(2, parse_json('[null]'))", + TABLE_NAME); + + // Array elements are all null, element type is null, falls back to unshredded + GroupType address = variant("address", 2, Type.Repetition.REQUIRED); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testInfrequentFieldPruning() throws IOException { + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='11')", TABLE_NAME); + StringBuilder valuesBuilder = new StringBuilder(); + for (int i = 1; i <= 11; i++) { + if (i > 1) { + valuesBuilder.append(", "); + } + + if (i == 1) { + // Only the first row has rare_field + valuesBuilder.append( + String.format( + "(%d, parse_json('{\"name\": \"User%d\", \"rare_field\": \"rare\"}'))", i, i)); + } else { + valuesBuilder.append(String.format("(%d, parse_json('{\"name\": \"User%d\"}'))", i, i)); + } + } + + sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString()); + + // rare_field appears in 1/11 rows, should be pruned + // name appears in 11/11 rows and should be kept + GroupType name = + field( + "name", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(name)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testMixedTypeTieBreaking() throws IOException { + StringBuilder valuesBuilder = new StringBuilder(); + for (int i = 1; i <= 10; i++) { + if (i > 1) { + valuesBuilder.append(", "); + } + + if (i <= 5) { + valuesBuilder.append(String.format("(%d, parse_json('{\"val\": %d}'))", i, i)); + } else { + valuesBuilder.append(String.format("(%d, parse_json('{\"val\": \"text%d\"}'))", i, i)); + } + } + + sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString()); + + // 5 ints + 5 strings is a tie so STRING wins (higher TIE_BREAK_PRIORITY) + GroupType val = + field( + "val", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(val)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + } + + @TestTemplate + public void testFieldOnlyAfterBuffer() throws IOException { + getTableEnv() + .getConfig() + .getConfiguration() + .setString("table.exec.resource.default-parallelism", "1"); + + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + + sql( + "CREATE TEMPORARY VIEW tmp_source AS " + + "SELECT * FROM (VALUES " + + "(1, parse_json('{\"name\": \"Alice\"}')), " + + "(2, parse_json('{\"name\": \"Bob\"}')), " + + "(3, parse_json('{\"name\": \"Charlie\"}')), " + + "(4, parse_json('{\"name\": \"David\", \"score\": 95}')), " + + "(5, parse_json('{\"name\": \"Eve\", \"score\": 88}')), " + + "(6, parse_json('{\"name\": \"Frank\", \"score\": 72}')), " + + "(7, parse_json('{\"name\": \"Grace\", \"score\": 91}'))" + + ") AS t(id, address)"); + + sql("INSERT INTO %s SELECT id, address FROM tmp_source ORDER BY id", TABLE_NAME); + + // Schema is determined from buffer (rows 1-3) which only has "name". + // "score" is not shredded + GroupType name = + field( + "name", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(name)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + + // Verify all data round-trips despite "score" not being shredded + List rows = + sql( + "SELECT id, JSON_VALUE(address, '$.name')," + + " JSON_VALUE(address, '$.score' returning int)" + + " FROM %s ORDER BY id", + TABLE_NAME); + assertThat(rows).hasSize(7); + assertThat(rows.get(0).getField(1)).isEqualTo("Alice"); + assertThat(rows.get(0).getField(2)).isNull(); + assertThat(rows.get(3).getField(1)).isEqualTo("David"); + assertThat(rows.get(3).getField(2)).isEqualTo(95); + assertThat(rows.get(6).getField(1)).isEqualTo("Grace"); + assertThat(rows.get(6).getField(2)).isEqualTo(91); + + getTableEnv() + .getConfig() + .getConfiguration() + .setString("table.exec.resource.default-parallelism", "4"); + sql("DROP TEMPORARY VIEW IF EXISTS tmp_source"); + } + + @TestTemplate + public void testCrossFileDifferentShreddedType() throws IOException { + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + + // File 1: "score" is always integer → shredded as INT8 + String batch1 = + "(1, parse_json('{\"score\": 95}'))," + + " (2, parse_json('{\"score\": 88}'))," + + " (3, parse_json('{\"score\": 72}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, batch1); + + // Verify file 1 schema: score shredded as INT8 + GroupType scoreInt = + field( + "score", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + MessageType expectedSchema1 = + parquetSchema(variant("address", 2, Type.Repetition.REQUIRED, objectFields(scoreInt))); + verifyParquetSchema(icebergTable, expectedSchema1); + + // File 2: "score" is always string → shredded as STRING + String batch2 = + "(4, parse_json('{\"score\": \"high\"}'))," + + " (5, parse_json('{\"score\": \"medium\"}'))," + + " (6, parse_json('{\"score\": \"low\"}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, batch2); + + // Query across both files, reader must handle different shredded types + List rows = + sql("SELECT id, json_value(address, '$.score') FROM %s ORDER BY id", TABLE_NAME); + assertThat(rows).hasSize(6); + assertThat(rows.get(0).getField(1)).isEqualTo("95"); + assertThat(rows.get(1).getField(1)).isEqualTo("88"); + assertThat(rows.get(3).getField(1)).isEqualTo("high"); + assertThat(rows.get(5).getField(1)).isEqualTo("low"); + } + + @TestTemplate + public void testAllNullVariantColumn() throws IOException { + + String variantNullAbleTableName = "test_all_null_variant_column"; + sql( + "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", + variantNullAbleTableName, FileFormat.PARQUET.name()); + + sql( + "INSERT INTO %s VALUES (1, CAST(null AS VARIANT)), (2, CAST(null AS VARIANT)), (3, CAST(null AS VARIANT))", + variantNullAbleTableName); + + // All variant values are SQL NULL, so no shredding should occur + MessageType expectedSchema = parquetSchema(variant("address", 2, Type.Repetition.OPTIONAL)); + Table variantNullAbleTable = + validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, variantNullAbleTableName)); + verifyParquetSchema(variantNullAbleTable, expectedSchema); + + List rows = sql("SELECT id, address FROM %s ORDER BY id", variantNullAbleTableName); + assertThat(rows).hasSize(3); + assertThat(rows.get(0).getField(1)).isNull(); + assertThat(rows.get(1).getField(1)).isNull(); + assertThat(rows.get(2).getField(1)).isNull(); + } + + @TestTemplate + public void testBufferSizeOne() throws IOException { + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='1')", TABLE_NAME); + + sql( + "INSERT INTO %s VALUES " + + "(1, parse_json('{\"name\": \"Alice\", \"age\": 30}'))," + + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," + + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))", + TABLE_NAME); + + // Schema inferred from first row only, should still shred name and age + GroupType age = + field( + "age", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8, true))); + GroupType name = + field( + "name", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(age, name)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); + + List rows = + sql("SELECT id, json_value(address, '$.name') FROM %s ORDER BY id", TABLE_NAME); + assertThat(rows).hasSize(3); + assertThat(rows.get(0).getField(1)).isEqualTo("Alice"); + assertThat(rows.get(2).getField(1)).isEqualTo("Charlie"); + } + + @TestTemplate + public void testDecimalFallbackAfterBuffer() { + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + + // Buffer: scale=2, 3 integer digits -> DECIMAL(5,2) + // Row 4: precision overflow -> fallback to value field + // Row 5: scale overflow -> fallback to value field + // Row 6: fits typed column, scale widened from 1 to 2 via setScale + String values = + " (1, parse_json('{\"val\": 123.45}'))," + + " (2, parse_json('{\"val\": 678.90}'))," + + " (3, parse_json('{\"val\": 999.99}'))," + + " (4, parse_json('{\"val\": 123456.78}'))," + + " (5, parse_json('{\"val\": 1.2345}'))," + + " (6, parse_json('{\"val\": 12.3}'))"; + sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + List rows = + sql( + "SELECT id, CAST(json_value(address, '$.val') AS DECIMAL(10, 4)) FROM %s ORDER BY id", + TABLE_NAME); + assertThat(rows).hasSize(6); + assertThat(rows.get(0).getField(1)).isEqualTo(new BigDecimal("123.4500")); + assertThat(rows.get(3).getField(1)).isEqualTo(new BigDecimal("123456.7800")); + assertThat(rows.get(4).getField(1)).isEqualTo(new BigDecimal("1.2345")); + assertThat(rows.get(5).getField(1)).isEqualTo(new BigDecimal("12.3000")); + } + + private void verifyParquetSchema(Table table, MessageType expectedSchema) throws IOException { + table.refresh(); + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks).isNotEmpty(); + + FileScanTask task = tasks.iterator().next(); + String path = task.file().location(); + + HadoopInputFile inputFile = + HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path), new Configuration()); + + try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { + MessageType actualSchema = reader.getFileMetaData().getSchema(); + assertThat(actualSchema).isEqualTo(expectedSchema); + } + } + } + + private static MessageType parquetSchema(Type variantTypes) { + return org.apache.parquet.schema.Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32) + .id(1) + .named("id") + .addFields(variantTypes) + .named("table"); + } + + private static GroupType variant(String name, int fieldId, Type.Repetition repetition) { + return org.apache.parquet.schema.Types.buildGroup(repetition) + .id(fieldId) + .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)) + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("metadata") + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("value") + .named(name); + } + + private static GroupType variant( + String name, int fieldId, Type.Repetition repetition, Type shreddedType) { + checkShreddedType(shreddedType); + return org.apache.parquet.schema.Types.buildGroup(repetition) + .id(fieldId) + .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)) + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("metadata") + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .named("value") + .addField(shreddedType) + .named(name); + } + + private static Type shreddedPrimitive(PrimitiveType.PrimitiveTypeName primitive) { + return optional(primitive).named("typed_value"); + } + + private static Type shreddedPrimitive( + PrimitiveType.PrimitiveTypeName primitive, LogicalTypeAnnotation annotation) { + return optional(primitive).as(annotation).named("typed_value"); + } + + private static Type shreddedPrimitive( + PrimitiveType.PrimitiveTypeName primitive, int length, LogicalTypeAnnotation annotation) { + return optional(primitive).length(length).as(annotation).named("typed_value"); + } + + private static GroupType objectFields(GroupType... fields) { + for (GroupType fieldType : fields) { + checkField(fieldType); + } + + return org.apache.parquet.schema.Types.buildGroup(Type.Repetition.OPTIONAL) + .addFields(fields) + .named("typed_value"); + } + + private static GroupType field(String name, Type shreddedType) { + checkShreddedType(shreddedType); + return org.apache.parquet.schema.Types.buildGroup(Type.Repetition.REQUIRED) + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .named("value") + .addField(shreddedType) + .named(name); + } + + private static GroupType element(Type shreddedType) { + return field("element", shreddedType); + } + + private static GroupType list(GroupType elementType) { + return org.apache.parquet.schema.Types.optionalList().element(elementType).named("typed_value"); + } + + private static void checkShreddedType(Type shreddedType) { + Preconditions.checkArgument( + shreddedType.getName().equals("typed_value"), + "Invalid shredded type name: %s should be typed_value", + shreddedType.getName()); + Preconditions.checkArgument( + shreddedType.isRepetition(Type.Repetition.OPTIONAL), + "Invalid shredded type repetition: %s should be OPTIONAL", + shreddedType.getRepetition()); + } + + private static void checkField(GroupType fieldType) { + Preconditions.checkArgument( + fieldType.isRepetition(Type.Repetition.REQUIRED), + "Invalid field type repetition: %s should be REQUIRED", + fieldType.getRepetition()); + } + + private List genericRowData() throws IOException { + List genericRowData = Lists.newArrayList(); + try (CloseableIterable combinedScanTasks = + icebergTable.newScan().planTasks()) { + for (CombinedScanTask combinedScanTask : combinedScanTasks) { + try (DataIterator dataIterator = + ReaderUtil.createDataIterator( + combinedScanTask, icebergTable.schema(), icebergTable.schema())) { + while (dataIterator.hasNext()) { + GenericRowData rowData = (GenericRowData) dataIterator.next(); + genericRowData.add(rowData); + } + } + } + } + + return genericRowData; + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java index 90dd6e117ba8..5d41e7e38a9d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java @@ -22,8 +22,8 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.UnaryOperator; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; @@ -51,7 +51,7 @@ public class ParquetFormatModel extends BaseFormatModel, R, MessageType> { private final boolean isBatchReader; private final VariantShreddingAnalyzer variantAnalyzer; - private final UnaryOperator copyFunc; + private final BiFunction copyFunc; public static ParquetFormatModel, Void, Object> forPositionDeletes() { return new ParquetFormatModel<>( @@ -73,7 +73,7 @@ public static ParquetFormatModel> create( WriterFunction, S, MessageType> writerFunction, ReaderFunction, S, MessageType> readerFunction, VariantShreddingAnalyzer variantAnalyzer, - UnaryOperator copyFunc) { + BiFunction copyFunc) { return new ParquetFormatModel<>( type, schemaType, writerFunction, readerFunction, false, variantAnalyzer, copyFunc); } @@ -92,7 +92,7 @@ private ParquetFormatModel( ReaderFunction readerFunction, boolean isBatchReader, VariantShreddingAnalyzer variantAnalyzer, - UnaryOperator copyFunc) { + BiFunction copyFunc) { super(type, schemaType, writerFunction, readerFunction); this.isBatchReader = isBatchReader; this.variantAnalyzer = variantAnalyzer; @@ -118,7 +118,7 @@ private static class WriteBuilderWrapper implements ModelWriteBuilder, S, MessageType> writerFunction; private final VariantShreddingAnalyzer variantAnalyzer; - private final UnaryOperator copyFunc; + private final BiFunction copyFunc; private Schema schema; private S engineSchema; private FileContent content; @@ -129,7 +129,7 @@ private WriteBuilderWrapper( EncryptedOutputFile outputFile, WriterFunction, S, MessageType> writerFunction, VariantShreddingAnalyzer variantAnalyzer, - UnaryOperator copyFunc) { + BiFunction copyFunc) { this.internal = Parquet.write(outputFile); this.writerFunction = writerFunction; this.variantAnalyzer = variantAnalyzer; @@ -283,7 +283,7 @@ private FileAppender buildShreddedAppender() { throw new UncheckedIOException("Failed to create shredded variant writer", e); } }, - copyFunc); + datum -> copyFunc.apply(datum, engineSchema)); } private static boolean hasVariantColumns(Schema schema) { diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index 36e254628a6a..64b5088e1583 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -370,7 +370,7 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), testAnalyzer, - record -> record); + (record, unused) -> record); try (FileAppender appender = model @@ -471,7 +471,7 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), analyzer, - record1 -> record1); + (oriRecord, unused) -> oriRecord); try (FileAppender appender = model diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 5b7862116aea..2c0763813d35 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -53,7 +53,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - InternalRow::copy)); + (internalRow, structType) -> internalRow.copy())); FormatModelRegistry.register( ParquetFormatModel.create( From 3e17878e86df8ca55f3bcbaa5a920c9506e9845e Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sat, 9 May 2026 13:27:34 +0800 Subject: [PATCH 02/14] rename SHRED_VARIANTS to PARQUET_SHRED_VARIANTS --- .../main/java/org/apache/iceberg/flink/FlinkWriteConf.java | 4 ++-- .../main/java/org/apache/iceberg/flink/FlinkWriteOptions.java | 4 ++-- .../src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java | 2 +- .../apache/iceberg/flink/TestFlinkVariantShreddingType.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index bc648923f1bd..cb6d9c3b6fc4 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -263,10 +263,10 @@ public Duration tableRefreshInterval() { .parseOptional(); } - public boolean shredVariants() { + public boolean parquetShredVariants() { return confParser .booleanConf() - .option(FlinkWriteOptions.SHRED_VARIANTS.key()) + .option(FlinkWriteOptions.PARQUET_SHRED_VARIANTS.key()) .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) .parse(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 3d0866213f59..0774b424731e 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -106,8 +106,8 @@ private FlinkWriteOptions() {} public static final ConfigOption UID_SUFFIX = ConfigOptions.key("uid-suffix").stringType().defaultValue(""); - public static final ConfigOption SHRED_VARIANTS = - ConfigOptions.key("shred-variants").booleanType().defaultValue(false); + public static final ConfigOption PARQUET_SHRED_VARIANTS = + ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false); public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(10); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index 6cb9a25bbedf..9e2826adc044 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -130,7 +130,7 @@ public static Map writeProperties( writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } - writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.shredVariants())); + writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.parquetShredVariants())); writeProperties.put( PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.variantInferenceBufferSize())); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java index 6d8dc572bf03..b368a9e5cb77 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -72,7 +72,7 @@ public void before() { sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); sql( - "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", + "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','variant-inference-buffer-size'='10')", TABLE_NAME, FileFormat.PARQUET.name()); icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); } From d8463a69be974d0c0c03806c4bcdf80d27939ced Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sat, 9 May 2026 13:43:05 +0800 Subject: [PATCH 03/14] fix spark 4.0 --- .../java/org/apache/iceberg/spark/source/SparkFormatModels.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 5b7862116aea..2c0763813d35 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -53,7 +53,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - InternalRow::copy)); + (internalRow, structType) -> internalRow.copy())); FormatModelRegistry.register( ParquetFormatModel.create( From fc8a53eae1923da7ec403184b1f8871abc272e45 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sat, 9 May 2026 14:52:58 +0800 Subject: [PATCH 04/14] Fix RowDataSerializer create every row --- .../iceberg/flink/data/FlinkFormatModels.java | 2 +- .../iceberg/parquet/ParquetFormatModel.java | 26 +++++++++++-------- .../parquet/TestParquetDataWriter.java | 4 +-- .../spark/source/SparkFormatModels.java | 2 +- .../spark/source/SparkFormatModels.java | 2 +- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java index c044dd15014d..6dd7c3e14204 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java @@ -36,7 +36,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new FlinkVariantShreddingAnalyzer(), - (row, rowType) -> new RowDataSerializer(rowType).copy(row))); + rowType -> new RowDataSerializer(rowType)::copy)); FormatModelRegistry.register( AvroFormatModel.create( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java index 5d41e7e38a9d..46db3f3bbdce 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java @@ -22,8 +22,8 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.UnaryOperator; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; @@ -51,7 +51,7 @@ public class ParquetFormatModel extends BaseFormatModel, R, MessageType> { private final boolean isBatchReader; private final VariantShreddingAnalyzer variantAnalyzer; - private final BiFunction copyFunc; + private final Function> copyFuncFactory; public static ParquetFormatModel, Void, Object> forPositionDeletes() { return new ParquetFormatModel<>( @@ -73,9 +73,9 @@ public static ParquetFormatModel> create( WriterFunction, S, MessageType> writerFunction, ReaderFunction, S, MessageType> readerFunction, VariantShreddingAnalyzer variantAnalyzer, - BiFunction copyFunc) { + Function> copyFuncFactory) { return new ParquetFormatModel<>( - type, schemaType, writerFunction, readerFunction, false, variantAnalyzer, copyFunc); + type, schemaType, writerFunction, readerFunction, false, variantAnalyzer, copyFuncFactory); } public static ParquetFormatModel> create( @@ -92,11 +92,11 @@ private ParquetFormatModel( ReaderFunction readerFunction, boolean isBatchReader, VariantShreddingAnalyzer variantAnalyzer, - BiFunction copyFunc) { + Function> copyFuncFactory) { super(type, schemaType, writerFunction, readerFunction); this.isBatchReader = isBatchReader; this.variantAnalyzer = variantAnalyzer; - this.copyFunc = copyFunc; + this.copyFuncFactory = copyFuncFactory; } @Override @@ -106,7 +106,8 @@ public FileFormat format() { @Override public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { - return new WriteBuilderWrapper<>(outputFile, writerFunction(), variantAnalyzer, copyFunc); + return new WriteBuilderWrapper<>( + outputFile, writerFunction(), variantAnalyzer, copyFuncFactory); } @Override @@ -118,7 +119,7 @@ private static class WriteBuilderWrapper implements ModelWriteBuilder, S, MessageType> writerFunction; private final VariantShreddingAnalyzer variantAnalyzer; - private final BiFunction copyFunc; + private final Function> copyFuncFactory; private Schema schema; private S engineSchema; private FileContent content; @@ -129,11 +130,11 @@ private WriteBuilderWrapper( EncryptedOutputFile outputFile, WriterFunction, S, MessageType> writerFunction, VariantShreddingAnalyzer variantAnalyzer, - BiFunction copyFunc) { + Function> copyFuncFactory) { this.internal = Parquet.write(outputFile); this.writerFunction = writerFunction; this.variantAnalyzer = variantAnalyzer; - this.copyFunc = copyFunc; + this.copyFuncFactory = copyFuncFactory; } @Override @@ -267,6 +268,9 @@ public FileAppender build() throws IOException { * top-level fields. */ private FileAppender buildShreddedAppender() { + UnaryOperator copyFunc = copyFuncFactory.apply(engineSchema); + Preconditions.checkState(copyFunc != null, "copyFunc must not return null"); + return new BufferedFileAppender<>( bufferSize, bufferedRows -> { @@ -283,7 +287,7 @@ private FileAppender buildShreddedAppender() { throw new UncheckedIOException("Failed to create shredded variant writer", e); } }, - datum -> copyFunc.apply(datum, engineSchema)); + copyFunc); } private static boolean hasVariantColumns(Schema schema) { diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index 64b5088e1583..fac01e945800 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -370,7 +370,7 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), testAnalyzer, - (record, unused) -> record); + unused -> oriRecord -> oriRecord); try (FileAppender appender = model @@ -471,7 +471,7 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), analyzer, - (oriRecord, unused) -> oriRecord); + unused -> oriRecord -> oriRecord); try (FileAppender appender = model diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 2c0763813d35..ae221b01ff42 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -53,7 +53,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - (internalRow, structType) -> internalRow.copy())); + structType -> InternalRow::copy)); FormatModelRegistry.register( ParquetFormatModel.create( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 2c0763813d35..ae221b01ff42 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -53,7 +53,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - (internalRow, structType) -> internalRow.copy())); + structType -> InternalRow::copy)); FormatModelRegistry.register( ParquetFormatModel.create( From 3c1ff52d81f799b03f351cc6e9f5f6e7c2e52fed Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sat, 9 May 2026 14:53:09 +0800 Subject: [PATCH 05/14] move set param to after --- .../flink/TestFlinkVariantShreddingType.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java index b368a9e5cb77..4432831b08aa 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -48,6 +48,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; @@ -77,6 +78,16 @@ public void before() { icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); } + @Override + @AfterEach + public void clean() { + super.clean(); + getTableEnv() + .getConfig() + .getConfiguration() + .setString("table.exec.resource.default-parallelism", "4"); + } + @TestTemplate public void testExcludingNullValue() throws IOException { String values = @@ -580,10 +591,6 @@ public void testFieldOnlyAfterBuffer() throws IOException { assertThat(rows.get(6).getField(1)).isEqualTo("Grace"); assertThat(rows.get(6).getField(2)).isEqualTo(91); - getTableEnv() - .getConfig() - .getConfiguration() - .setString("table.exec.resource.default-parallelism", "4"); sql("DROP TEMPORARY VIEW IF EXISTS tmp_source"); } From 41d832c715c1a717109f29b44ad82bdece3f19f8 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sat, 9 May 2026 23:19:05 +0800 Subject: [PATCH 06/14] Address aihua's Comment --- .../org/apache/iceberg/flink/FlinkWriteConf.java | 4 ++-- .../apache/iceberg/flink/FlinkWriteOptions.java | 4 ++-- .../org/apache/iceberg/flink/sink/SinkUtil.java | 2 +- .../flink/TestFlinkVariantShreddingType.java | 16 ++++++++-------- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index cb6d9c3b6fc4..25cae484962e 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -272,10 +272,10 @@ public boolean parquetShredVariants() { .parse(); } - public int variantInferenceBufferSize() { + public int parquetVariantInferenceBufferSize() { return confParser .intConf() - .option(FlinkWriteOptions.VARIANT_INFERENCE_BUFFER_SIZE.key()) + .option(FlinkWriteOptions.PARQUET_VARIANT_INFERENCE_BUFFER_SIZE.key()) .tableProperty(TableProperties.PARQUET_VARIANT_BUFFER_SIZE) .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT) .parse(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 0774b424731e..a6bb6d340eda 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -109,6 +109,6 @@ private FlinkWriteOptions() {} public static final ConfigOption PARQUET_SHRED_VARIANTS = ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false); - public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = - ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(10); + public static final ConfigOption PARQUET_VARIANT_INFERENCE_BUFFER_SIZE = + ConfigOptions.key("parquet-variant-inference-buffer-size").intType().defaultValue(100); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index 9e2826adc044..d4c3d3beb80f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -132,7 +132,7 @@ public static Map writeProperties( writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.parquetShredVariants())); writeProperties.put( - PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.variantInferenceBufferSize())); + PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.parquetVariantInferenceBufferSize())); break; case AVRO: diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java index 4432831b08aa..796b313e5cd8 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -73,7 +73,7 @@ public void before() { sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); sql( - "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','variant-inference-buffer-size'='10')", + "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','parquet-variant-inference-buffer-size'='10')", TABLE_NAME, FileFormat.PARQUET.name()); icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); } @@ -325,7 +325,7 @@ public void testLazyInitializationWithBufferedRows() throws IOException { @TestTemplate public void testColumnIndexTruncateLength() throws IOException { - sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); int customTruncateLength = 10; sql( @@ -480,7 +480,7 @@ public void testArrayOfNullElementsWithShredding() throws IOException { @TestTemplate public void testInfrequentFieldPruning() throws IOException { - sql("ALTER TABLE %s SET('variant-inference-buffer-size'='11')", TABLE_NAME); + sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='11')", TABLE_NAME); StringBuilder valuesBuilder = new StringBuilder(); for (int i = 1; i <= 11; i++) { if (i > 1) { @@ -548,7 +548,7 @@ public void testFieldOnlyAfterBuffer() throws IOException { .getConfiguration() .setString("table.exec.resource.default-parallelism", "1"); - sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); sql( "CREATE TEMPORARY VIEW tmp_source AS " @@ -596,7 +596,7 @@ public void testFieldOnlyAfterBuffer() throws IOException { @TestTemplate public void testCrossFileDifferentShreddedType() throws IOException { - sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); // File 1: "score" is always integer → shredded as INT8 String batch1 = @@ -637,7 +637,7 @@ public void testAllNullVariantColumn() throws IOException { String variantNullAbleTableName = "test_all_null_variant_column"; sql( - "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", + "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','parquet-variant-inference-buffer-size'='10')", variantNullAbleTableName, FileFormat.PARQUET.name()); sql( @@ -659,7 +659,7 @@ public void testAllNullVariantColumn() throws IOException { @TestTemplate public void testBufferSizeOne() throws IOException { - sql("ALTER TABLE %s SET('variant-inference-buffer-size'='1')", TABLE_NAME); + sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='1')", TABLE_NAME); sql( "INSERT INTO %s VALUES " @@ -693,7 +693,7 @@ public void testBufferSizeOne() throws IOException { @TestTemplate public void testDecimalFallbackAfterBuffer() { - sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); // Buffer: scale=2, 3 integer digits -> DECIMAL(5,2) // Row 4: precision overflow -> fallback to value field From 41eeeeb27c4591e6eaeb8e3208a14d2cd7829c1f Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sun, 10 May 2026 13:58:22 +0800 Subject: [PATCH 07/14] Address Comments --- docs/docs/flink-configuration.md | 2 + .../apache/iceberg/flink/FlinkWriteConf.java | 2 +- .../iceberg/flink/FlinkWriteOptions.java | 4 +- .../data/FlinkVariantShreddingAnalyzer.java | 6 +++ .../flink/TestFlinkVariantShreddingType.java | 49 ++++++++++++++----- 5 files changed, 47 insertions(+), 16 deletions(-) diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md index f30b42288896..ea0429fd9f65 100644 --- a/docs/docs/flink-configuration.md +++ b/docs/docs/flink-configuration.md @@ -160,6 +160,8 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | | write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | | uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table | +| parquet-shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write +| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write #### Range distribution statistics type diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 25cae484962e..c03f317978d6 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -275,7 +275,7 @@ public boolean parquetShredVariants() { public int parquetVariantInferenceBufferSize() { return confParser .intConf() - .option(FlinkWriteOptions.PARQUET_VARIANT_INFERENCE_BUFFER_SIZE.key()) + .option(FlinkWriteOptions.VARIANT_INFERENCE_BUFFER_SIZE.key()) .tableProperty(TableProperties.PARQUET_VARIANT_BUFFER_SIZE) .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT) .parse(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index a6bb6d340eda..c4ab517cf97c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -109,6 +109,6 @@ private FlinkWriteOptions() {} public static final ConfigOption PARQUET_SHRED_VARIANTS = ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false); - public static final ConfigOption PARQUET_VARIANT_INFERENCE_BUFFER_SIZE = - ConfigOptions.key("parquet-variant-inference-buffer-size").intType().defaultValue(100); + public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = + ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(100); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java index 4e39cad89f9d..c89af83f14ff 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java @@ -29,6 +29,10 @@ import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantValue; +/** + * Analyzes Variant fields in Flink {@link RowData} and converts Flink's binary Variant + * representation to Iceberg {@link VariantValue} instances for Variant shredding. + */ public class FlinkVariantShreddingAnalyzer extends VariantShreddingAnalyzer { @Override @@ -38,6 +42,8 @@ protected List extractVariantValues( for (RowData row : bufferedRows) { if (!row.isNullAt(variantFieldIndex)) { + // Flink currently has only BinaryVariant as its Variant implementation, so this analyzer + // intentionally narrows RowData#getVariant's return value to BinaryVariant here. BinaryVariant flinkVariant = (BinaryVariant) row.getVariant(variantFieldIndex); if (flinkVariant != null) { VariantValue variantValue = diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java index 796b313e5cd8..798bb4672206 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -73,7 +73,7 @@ public void before() { sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); sql( - "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','parquet-variant-inference-buffer-size'='10')", + "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','variant-inference-buffer-size'='10')", TABLE_NAME, FileFormat.PARQUET.name()); icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); } @@ -325,7 +325,7 @@ public void testLazyInitializationWithBufferedRows() throws IOException { @TestTemplate public void testColumnIndexTruncateLength() throws IOException { - sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); int customTruncateLength = 10; sql( @@ -480,7 +480,9 @@ public void testArrayOfNullElementsWithShredding() throws IOException { @TestTemplate public void testInfrequentFieldPruning() throws IOException { - sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='11')", TABLE_NAME); + // This test relies on the current VariantShreddingAnalyzer MIN_FIELD_FREQUENCY threshold of + // 0.10: rare_field appears in 1/11 rows (~0.09), so it should be pruned. + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='11')", TABLE_NAME); StringBuilder valuesBuilder = new StringBuilder(); for (int i = 1; i <= 11; i++) { if (i > 1) { @@ -548,7 +550,7 @@ public void testFieldOnlyAfterBuffer() throws IOException { .getConfiguration() .setString("table.exec.resource.default-parallelism", "1"); - sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); sql( "CREATE TEMPORARY VIEW tmp_source AS " @@ -596,7 +598,7 @@ public void testFieldOnlyAfterBuffer() throws IOException { @TestTemplate public void testCrossFileDifferentShreddedType() throws IOException { - sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); // File 1: "score" is always integer → shredded as INT8 String batch1 = @@ -637,7 +639,7 @@ public void testAllNullVariantColumn() throws IOException { String variantNullAbleTableName = "test_all_null_variant_column"; sql( - "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','parquet-variant-inference-buffer-size'='10')", + "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", variantNullAbleTableName, FileFormat.PARQUET.name()); sql( @@ -659,7 +661,7 @@ public void testAllNullVariantColumn() throws IOException { @TestTemplate public void testBufferSizeOne() throws IOException { - sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='1')", TABLE_NAME); + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='1')", TABLE_NAME); sql( "INSERT INTO %s VALUES " @@ -692,21 +694,40 @@ public void testBufferSizeOne() throws IOException { } @TestTemplate - public void testDecimalFallbackAfterBuffer() { - sql("ALTER TABLE %s SET('parquet-variant-inference-buffer-size'='3')", TABLE_NAME); + public void testDecimalFallbackAfterBuffer() throws IOException { + getTableEnv() + .getConfig() + .getConfiguration() + .setString("table.exec.resource.default-parallelism", "1"); + + sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); // Buffer: scale=2, 3 integer digits -> DECIMAL(5,2) // Row 4: precision overflow -> fallback to value field // Row 5: scale overflow -> fallback to value field // Row 6: fits typed column, scale widened from 1 to 2 via setScale - String values = - " (1, parse_json('{\"val\": 123.45}'))," + sql( + "CREATE TEMPORARY VIEW tmp_source AS " + + "SELECT * FROM (VALUES " + + " (1, parse_json('{\"val\": 123.45}'))," + " (2, parse_json('{\"val\": 678.90}'))," + " (3, parse_json('{\"val\": 999.99}'))," + " (4, parse_json('{\"val\": 123456.78}'))," + " (5, parse_json('{\"val\": 1.2345}'))," - + " (6, parse_json('{\"val\": 12.3}'))"; - sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); + + " (6, parse_json('{\"val\": 12.3}'))" + + ") AS t(id, address)"); + + sql("INSERT INTO %s SELECT id, address FROM tmp_source ORDER BY id", TABLE_NAME); + + GroupType val = + field( + "val", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.decimalType(2, 5))); + GroupType address = variant("address", 2, Type.Repetition.REQUIRED, objectFields(val)); + MessageType expectedSchema = parquetSchema(address); + + verifyParquetSchema(icebergTable, expectedSchema); List rows = sql( @@ -717,6 +738,8 @@ public void testDecimalFallbackAfterBuffer() { assertThat(rows.get(3).getField(1)).isEqualTo(new BigDecimal("123456.7800")); assertThat(rows.get(4).getField(1)).isEqualTo(new BigDecimal("1.2345")); assertThat(rows.get(5).getField(1)).isEqualTo(new BigDecimal("12.3000")); + + sql("DROP TEMPORARY VIEW IF EXISTS tmp_source"); } private void verifyParquetSchema(Table table, MessageType expectedSchema) throws IOException { From 31a8bc09283d1e7db3f991cf627cb95bafab44da Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 15 May 2026 16:06:16 +0800 Subject: [PATCH 08/14] Address talatuyarer's Comments --- docs/docs/flink-configuration.md | 4 ++-- .../iceberg/flink/data/FlinkVariantShreddingAnalyzer.java | 6 +----- .../apache/iceberg/flink/TestFlinkVariantShreddingType.java | 2 +- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md index ea0429fd9f65..443d8d12b2e4 100644 --- a/docs/docs/flink-configuration.md +++ b/docs/docs/flink-configuration.md @@ -160,8 +160,8 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | | write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | | uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table | -| parquet-shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write -| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write +| parquet-shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write | +| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write | #### Range distribution statistics type diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java index c89af83f14ff..71d65a6fe1d1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java @@ -62,10 +62,6 @@ protected List extractVariantValues( @Override protected int resolveColumnIndex(RowType flinkSchema, String columnName) { - try { - return flinkSchema.getFieldIndex(columnName); - } catch (IllegalArgumentException e) { - return -1; - } + return flinkSchema.getFieldIndex(columnName); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java index 798bb4672206..7e1e07866581 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -639,7 +639,7 @@ public void testAllNullVariantColumn() throws IOException { String variantNullAbleTableName = "test_all_null_variant_column"; sql( - "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", + "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','variant-inference-buffer-size'='10')", variantNullAbleTableName, FileFormat.PARQUET.name()); sql( From 8a5c995515ca0a9f67fccc80eec8ac3e71afba99 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 15 May 2026 22:50:41 +0800 Subject: [PATCH 09/14] Address Peter's Comments --- docs/docs/flink-configuration.md | 2 +- .../apache/iceberg/flink/FlinkWriteConf.java | 2 +- .../iceberg/flink/FlinkWriteOptions.java | 4 +- .../data/FlinkVariantShreddingAnalyzer.java | 23 +- .../flink/TestFlinkVariantShreddingType.java | 318 +++++++++++++----- 5 files changed, 244 insertions(+), 105 deletions(-) diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md index 443d8d12b2e4..f42a6273a9f1 100644 --- a/docs/docs/flink-configuration.md +++ b/docs/docs/flink-configuration.md @@ -160,7 +160,7 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | | write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | | uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table | -| parquet-shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write | +| shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write | | variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write | #### Range distribution statistics type diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index c03f317978d6..fd3fccb224a2 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -266,7 +266,7 @@ public Duration tableRefreshInterval() { public boolean parquetShredVariants() { return confParser .booleanConf() - .option(FlinkWriteOptions.PARQUET_SHRED_VARIANTS.key()) + .option(FlinkWriteOptions.SHRED_VARIANTS.key()) .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) .parse(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index c4ab517cf97c..71e3800131cc 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -106,8 +106,8 @@ private FlinkWriteOptions() {} public static final ConfigOption UID_SUFFIX = ConfigOptions.key("uid-suffix").stringType().defaultValue(""); - public static final ConfigOption PARQUET_SHRED_VARIANTS = - ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false); + public static final ConfigOption SHRED_VARIANTS = + ConfigOptions.key("shred-variants").booleanType().defaultValue(false); public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(100); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java index 71d65a6fe1d1..cfb4d9a55680 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java @@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.variant.BinaryVariant; +import org.apache.flink.types.variant.Variant; import org.apache.iceberg.parquet.VariantShreddingAnalyzer; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.variants.VariantMetadata; @@ -42,17 +43,21 @@ protected List extractVariantValues( for (RowData row : bufferedRows) { if (!row.isNullAt(variantFieldIndex)) { - // Flink currently has only BinaryVariant as its Variant implementation, so this analyzer - // intentionally narrows RowData#getVariant's return value to BinaryVariant here. - BinaryVariant flinkVariant = (BinaryVariant) row.getVariant(variantFieldIndex); + Variant flinkVariant = row.getVariant(variantFieldIndex); if (flinkVariant != null) { - VariantValue variantValue = - VariantValue.from( - VariantMetadata.from( - ByteBuffer.wrap(flinkVariant.getMetadata()).order(ByteOrder.LITTLE_ENDIAN)), - ByteBuffer.wrap(flinkVariant.getValue()).order(ByteOrder.LITTLE_ENDIAN)); + if (flinkVariant instanceof BinaryVariant binaryVariant) { + VariantValue variantValue = + VariantValue.from( + VariantMetadata.from( + ByteBuffer.wrap(binaryVariant.getMetadata()) + .order(ByteOrder.LITTLE_ENDIAN)), + ByteBuffer.wrap(binaryVariant.getValue()).order(ByteOrder.LITTLE_ENDIAN)); - values.add(variantValue); + values.add(variantValue); + } else { + throw new UnsupportedOperationException( + "Not a supported type: " + flinkVariant.getClass()); + } } } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java index 7e1e07866581..e809a47cd262 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java @@ -73,7 +73,17 @@ public void before() { sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); sql( - "CREATE TABLE %s (id int NOT NULL, address variant NOT NULL) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','variant-inference-buffer-size'='10')", + """ + CREATE TABLE %s ( + id int NOT NULL, + address variant NOT NULL + ) WITH ( + 'write.format.default'='%s', + 'format-version'='3', + 'shred-variants'='true', + 'variant-inference-buffer-size'='10' + ) + """, TABLE_NAME, FileFormat.PARQUET.name()); icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); } @@ -91,9 +101,11 @@ public void clean() { @TestTemplate public void testExcludingNullValue() throws IOException { String values = - "(1, parse_json('{\"name\": \"Alice\", \"age\": 30, \"dummy\": null}'))," - + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," - + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))"; + """ + (1, parse_json('{"name": "Alice", "age": 30, "dummy": null}')), + (2, parse_json('{"name": "Bob", "age": 25}')), + (3, parse_json('{"name": "Charlie", "age": 35}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType name = @@ -114,9 +126,11 @@ public void testExcludingNullValue() throws IOException { @TestTemplate public void testConsistentType() throws IOException { String values = - "(1, parse_json('{\"age\": \"25\"}'))," - + " (2, parse_json('{\"age\": 30}'))," - + " (3, parse_json('{\"age\": \"35\"}'))"; + """ + (1, parse_json('{"age": "25"}')), + (2, parse_json('{"age": 30}')), + (3, parse_json('{"age": "35"}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType age = @@ -131,7 +145,12 @@ public void testConsistentType() throws IOException { @TestTemplate public void testPrimitiveType() throws IOException { - String values = "(1, parse_json('123')), (2, parse_json('\"abc\"')), (3, parse_json('12'))"; + String values = + """ + (1, parse_json('123')), + (2, parse_json('"abc"')), + (3, parse_json('12')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType address = @@ -150,7 +169,11 @@ public void testPrimitiveType() throws IOException { @TestTemplate public void testPrimitiveDecimalType() throws IOException { String values = - "(1, parse_json('123.56')), (2, parse_json('\"abc\"')), (3, parse_json('12.56'))"; + """ + (1, parse_json('123.56')), + (2, parse_json('"abc"')), + (3, parse_json('12.56')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType address = @@ -168,9 +191,11 @@ public void testPrimitiveDecimalType() throws IOException { @TestTemplate public void testBooleanType() throws IOException { String values = - "(1, parse_json('{\"active\": true}'))," - + " (2, parse_json('{\"active\": false}'))," - + " (3, parse_json('{\"active\": true}'))"; + """ + (1, parse_json('{"active": true}')), + (2, parse_json('{"active": false}')), + (3, parse_json('{"active": true}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType active = field("active", shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BOOLEAN)); @@ -182,9 +207,11 @@ public void testBooleanType() throws IOException { @TestTemplate public void testDecimalTypeWithInconsistentScales() throws IOException { String values = - "(1, parse_json('{\"price\": 123.456789}'))," - + " (2, parse_json('{\"price\": 678.90}'))," - + " (3, parse_json('{\"price\": 999.99}'))"; + """ + (1, parse_json('{"price": 123.456789}')), + (2, parse_json('{"price": 678.90}')), + (3, parse_json('{"price": 999.99}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType price = @@ -200,9 +227,11 @@ public void testDecimalTypeWithInconsistentScales() throws IOException { @TestTemplate public void testDecimalTypeWithConsistentScales() throws IOException { String values = - "(1, parse_json('{\"price\": 123.45}'))," - + " (2, parse_json('{\"price\": 678.90}'))," - + " (3, parse_json('{\"price\": 999.99}'))"; + """ + (1, parse_json('{"price": 123.45}')), + (2, parse_json('{"price": 678.90}')), + (3, parse_json('{"price": 999.99}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType price = @@ -218,9 +247,11 @@ public void testDecimalTypeWithConsistentScales() throws IOException { @TestTemplate public void testArrayType() throws IOException { String values = - "(1, parse_json('[\"java\", \"scala\", \"python\"]'))," - + " (2, parse_json('[\"rust\", \"go\"]'))," - + " (3, parse_json('[\"javascript\"]'))"; + """ + (1, parse_json('["java", "scala", "python"]')), + (2, parse_json('["rust", "go"]')), + (3, parse_json('["javascript"]')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType arr = @@ -238,9 +269,11 @@ public void testArrayType() throws IOException { public void testNestedArrayType() throws IOException { String values = - "(1, parse_json('{\"tags\": [\"java\", \"scala\", \"python\"]}'))," - + " (2, parse_json('{\"tags\": [\"rust\", \"go\"]}'))," - + " (3, parse_json('{\"tags\": [\"javascript\"]}'))"; + """ + (1, parse_json('{"tags": ["java", "scala", "python"]}')), + (2, parse_json('{"tags": ["rust", "go"]}')), + (3, parse_json('{"tags": ["javascript"]}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType tags = @@ -261,9 +294,11 @@ public void testNestedArrayType() throws IOException { public void testNestedObjectType() throws IOException { String values = - "(1, parse_json('{\"location\": {\"city\": \"Seattle\", \"zip\": 98101}, \"tags\": [\"java\", \"scala\", \"python\"]}'))," - + " (2, parse_json('{\"location\": {\"city\": \"Portland\", \"zip\": 97201}}'))," - + " (3, parse_json('{\"location\": {\"city\": \"NYC\", \"zip\": 10001}}'))"; + """ + (1, parse_json('{"location": {"city": "Seattle", "zip": 98101}, "tags": ["java", "scala", "python"]}')), + (2, parse_json('{"location": {"city": "Portland", "zip": 97201}}')), + (3, parse_json('{"location": {"city": "NYC", "zip": 10001}}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType city = @@ -297,13 +332,15 @@ public void testNestedObjectType() throws IOException { public void testLazyInitializationWithBufferedRows() throws IOException { String values = - "(1, parse_json('{\"name\": \"Alice\", \"age\": 30}'))," - + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," - + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))," - + " (4, parse_json('{\"name\": \"David\", \"age\": 28}'))," - + " (5, parse_json('{\"name\": \"Eve\", \"age\": 32}'))," - + " (6, parse_json('{\"name\": \"Frank\", \"age\": 40}'))," - + " (7, parse_json('{\"name\": \"Grace\", \"age\": 27}'))"; + """ + (1, parse_json('{"name": "Alice", "age": 30}')), + (2, parse_json('{"name": "Bob", "age": 25}')), + (3, parse_json('{"name": "Charlie", "age": 35}')), + (4, parse_json('{"name": "David", "age": 28}')), + (5, parse_json('{"name": "Eve", "age": 32}')), + (6, parse_json('{"name": "Frank", "age": 40}')), + (7, parse_json('{"name": "Grace", "age": 27}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType name = @@ -341,7 +378,13 @@ public void testColumnIndexTruncateLength() throws IOException { String longValue = "A".repeat(20); valuesBuilder.append( String.format( - "(%d, parse_json('{\"description\": \"%s\", \"id\": %d}'))", i, longValue, i)); + """ + (%d, parse_json('{"description": "%s", "id": %d}')) + """ + .trim(), + i, + longValue, + i)); } sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString()); @@ -368,10 +411,12 @@ public void testIntegerFamilyPromotion() throws IOException { // Mix of INT8, INT16, INT32, INT64 - should promote to INT64 String values = - "(1, parse_json('{\"value\": 10}'))," - + " (2, parse_json('{\"value\": 1000}'))," - + " (3, parse_json('{\"value\": 100000}'))," - + " (4, parse_json('{\"value\": 10000000000}'))"; + """ + (1, parse_json('{"value": 10}')), + (2, parse_json('{"value": 1000}')), + (3, parse_json('{"value": 100000}')), + (4, parse_json('{"value": 10000000000}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType value = @@ -390,9 +435,11 @@ public void testDecimalFamilyPromotion() throws IOException { // Test that they get promoted to the most capable decimal type observed String values = - "(1, parse_json('{\"value\": 1.5}'))," - + " (2, parse_json('{\"value\": 123.456789}'))," - + " (3, parse_json('{\"value\": 123456789123456.789}'))"; + """ + (1, parse_json('{"value": 1.5}')), + (2, parse_json('{"value": 123.456789}')), + (3, parse_json('{"value": 123456789123456.789}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType value = @@ -411,9 +458,11 @@ public void testDecimalFamilyPromotion() throws IOException { @TestTemplate public void testDataRoundTripWithShredding() throws IOException { String values = - "(1, parse_json('{\"name\": \"Alice\", \"age\": 30}'))," - + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," - + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))"; + """ + (1, parse_json('{"name": "Alice", "age": 30}')), + (2, parse_json('{"name": "Bob", "age": 25}')), + (3, parse_json('{"name": "Charlie", "age": 35}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType name = @@ -434,9 +483,13 @@ public void testDataRoundTripWithShredding() throws IOException { // Verify that we can read the data back correctly List rows = sql( - "SELECT id, JSON_VALUE(address, '$.name')," - + " JSON_VALUE(address, '$.age' RETURNING int)" - + " FROM %s ORDER BY id", + """ + SELECT id, + JSON_VALUE(address, '$.name'), + JSON_VALUE(address, '$.age' RETURNING int) + FROM %s + ORDER BY id + """, TABLE_NAME); assertThat(rows).hasSize(3); assertThat(rows.get(0).getField(0)).isEqualTo(1); @@ -454,7 +507,11 @@ public void testDataRoundTripWithShredding() throws IOException { public void testVariantWithNullValues() throws IOException { String values = - "(1, parse_json('null'))," + " (2, parse_json('null'))," + " (3, parse_json('null'))"; + """ + (1, parse_json('null')), + (2, parse_json('null')), + (3, parse_json('null')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, values); GroupType address = variant("address", 2, Type.Repetition.REQUIRED); @@ -467,8 +524,11 @@ public void testVariantWithNullValues() throws IOException { public void testArrayOfNullElementsWithShredding() throws IOException { sql( - "INSERT INTO %s VALUES (1, parse_json('[null, null, null]')), " - + "(2, parse_json('[null]'))", + """ + INSERT INTO %s VALUES + (1, parse_json('[null, null, null]')), + (2, parse_json('[null]')) + """, TABLE_NAME); // Array elements are all null, element type is null, falls back to unshredded @@ -493,9 +553,21 @@ public void testInfrequentFieldPruning() throws IOException { // Only the first row has rare_field valuesBuilder.append( String.format( - "(%d, parse_json('{\"name\": \"User%d\", \"rare_field\": \"rare\"}'))", i, i)); + """ + (%d, parse_json('{"name": "User%d", "rare_field": "rare"}')) + """ + .trim(), + i, + i)); } else { - valuesBuilder.append(String.format("(%d, parse_json('{\"name\": \"User%d\"}'))", i, i)); + valuesBuilder.append( + String.format( + """ + (%d, parse_json('{"name": "User%d"}')) + """ + .trim(), + i, + i)); } } @@ -523,9 +595,23 @@ public void testMixedTypeTieBreaking() throws IOException { } if (i <= 5) { - valuesBuilder.append(String.format("(%d, parse_json('{\"val\": %d}'))", i, i)); + valuesBuilder.append( + String.format( + """ + (%d, parse_json('{"val": %d}')) + """ + .trim(), + i, + i)); } else { - valuesBuilder.append(String.format("(%d, parse_json('{\"val\": \"text%d\"}'))", i, i)); + valuesBuilder.append( + String.format( + """ + (%d, parse_json('{"val": "text%d"}')) + """ + .trim(), + i, + i)); } } @@ -553,16 +639,18 @@ public void testFieldOnlyAfterBuffer() throws IOException { sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME); sql( - "CREATE TEMPORARY VIEW tmp_source AS " - + "SELECT * FROM (VALUES " - + "(1, parse_json('{\"name\": \"Alice\"}')), " - + "(2, parse_json('{\"name\": \"Bob\"}')), " - + "(3, parse_json('{\"name\": \"Charlie\"}')), " - + "(4, parse_json('{\"name\": \"David\", \"score\": 95}')), " - + "(5, parse_json('{\"name\": \"Eve\", \"score\": 88}')), " - + "(6, parse_json('{\"name\": \"Frank\", \"score\": 72}')), " - + "(7, parse_json('{\"name\": \"Grace\", \"score\": 91}'))" - + ") AS t(id, address)"); + """ + CREATE TEMPORARY VIEW tmp_source AS + SELECT * FROM (VALUES + (1, parse_json('{"name": "Alice"}')), + (2, parse_json('{"name": "Bob"}')), + (3, parse_json('{"name": "Charlie"}')), + (4, parse_json('{"name": "David", "score": 95}')), + (5, parse_json('{"name": "Eve", "score": 88}')), + (6, parse_json('{"name": "Frank", "score": 72}')), + (7, parse_json('{"name": "Grace", "score": 91}')) + ) AS t(id, address) + """); sql("INSERT INTO %s SELECT id, address FROM tmp_source ORDER BY id", TABLE_NAME); @@ -581,9 +669,13 @@ public void testFieldOnlyAfterBuffer() throws IOException { // Verify all data round-trips despite "score" not being shredded List rows = sql( - "SELECT id, JSON_VALUE(address, '$.name')," - + " JSON_VALUE(address, '$.score' returning int)" - + " FROM %s ORDER BY id", + """ + SELECT id, + JSON_VALUE(address, '$.name'), + JSON_VALUE(address, '$.score' RETURNING int) + FROM %s + ORDER BY id + """, TABLE_NAME); assertThat(rows).hasSize(7); assertThat(rows.get(0).getField(1)).isEqualTo("Alice"); @@ -602,9 +694,11 @@ public void testCrossFileDifferentShreddedType() throws IOException { // File 1: "score" is always integer → shredded as INT8 String batch1 = - "(1, parse_json('{\"score\": 95}'))," - + " (2, parse_json('{\"score\": 88}'))," - + " (3, parse_json('{\"score\": 72}'))"; + """ + (1, parse_json('{"score": 95}')), + (2, parse_json('{"score": 88}')), + (3, parse_json('{"score": 72}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, batch1); // Verify file 1 schema: score shredded as INT8 @@ -619,14 +713,23 @@ public void testCrossFileDifferentShreddedType() throws IOException { // File 2: "score" is always string → shredded as STRING String batch2 = - "(4, parse_json('{\"score\": \"high\"}'))," - + " (5, parse_json('{\"score\": \"medium\"}'))," - + " (6, parse_json('{\"score\": \"low\"}'))"; + """ + (4, parse_json('{"score": "high"}')), + (5, parse_json('{"score": "medium"}')), + (6, parse_json('{"score": "low"}')) + """; sql("INSERT INTO %s VALUES %s", TABLE_NAME, batch2); // Query across both files, reader must handle different shredded types List rows = - sql("SELECT id, json_value(address, '$.score') FROM %s ORDER BY id", TABLE_NAME); + sql( + """ + SELECT id, + json_value(address, '$.score') + FROM %s + ORDER BY id + """, + TABLE_NAME); assertThat(rows).hasSize(6); assertThat(rows.get(0).getField(1)).isEqualTo("95"); assertThat(rows.get(1).getField(1)).isEqualTo("88"); @@ -639,11 +742,26 @@ public void testAllNullVariantColumn() throws IOException { String variantNullAbleTableName = "test_all_null_variant_column"; sql( - "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','parquet-shred-variants'='true','variant-inference-buffer-size'='10')", + """ + CREATE TABLE %s ( + id int NOT NULL, + address variant + ) WITH ( + 'write.format.default'='%s', + 'format-version'='3', + 'shred-variants'='true', + 'variant-inference-buffer-size'='10' + ) + """, variantNullAbleTableName, FileFormat.PARQUET.name()); sql( - "INSERT INTO %s VALUES (1, CAST(null AS VARIANT)), (2, CAST(null AS VARIANT)), (3, CAST(null AS VARIANT))", + """ + INSERT INTO %s VALUES + (1, CAST(null AS VARIANT)), + (2, CAST(null AS VARIANT)), + (3, CAST(null AS VARIANT)) + """, variantNullAbleTableName); // All variant values are SQL NULL, so no shredding should occur @@ -664,10 +782,12 @@ public void testBufferSizeOne() throws IOException { sql("ALTER TABLE %s SET('variant-inference-buffer-size'='1')", TABLE_NAME); sql( - "INSERT INTO %s VALUES " - + "(1, parse_json('{\"name\": \"Alice\", \"age\": 30}'))," - + " (2, parse_json('{\"name\": \"Bob\", \"age\": 25}'))," - + " (3, parse_json('{\"name\": \"Charlie\", \"age\": 35}'))", + """ + INSERT INTO %s VALUES + (1, parse_json('{"name": "Alice", "age": 30}')), + (2, parse_json('{"name": "Bob", "age": 25}')), + (3, parse_json('{"name": "Charlie", "age": 35}')) + """, TABLE_NAME); // Schema inferred from first row only, should still shred name and age @@ -687,7 +807,14 @@ public void testBufferSizeOne() throws IOException { verifyParquetSchema(icebergTable, expectedSchema); List rows = - sql("SELECT id, json_value(address, '$.name') FROM %s ORDER BY id", TABLE_NAME); + sql( + """ + SELECT id, + json_value(address, '$.name') + FROM %s + ORDER BY id + """, + TABLE_NAME); assertThat(rows).hasSize(3); assertThat(rows.get(0).getField(1)).isEqualTo("Alice"); assertThat(rows.get(2).getField(1)).isEqualTo("Charlie"); @@ -707,15 +834,17 @@ public void testDecimalFallbackAfterBuffer() throws IOException { // Row 5: scale overflow -> fallback to value field // Row 6: fits typed column, scale widened from 1 to 2 via setScale sql( - "CREATE TEMPORARY VIEW tmp_source AS " - + "SELECT * FROM (VALUES " - + " (1, parse_json('{\"val\": 123.45}'))," - + " (2, parse_json('{\"val\": 678.90}'))," - + " (3, parse_json('{\"val\": 999.99}'))," - + " (4, parse_json('{\"val\": 123456.78}'))," - + " (5, parse_json('{\"val\": 1.2345}'))," - + " (6, parse_json('{\"val\": 12.3}'))" - + ") AS t(id, address)"); + """ + CREATE TEMPORARY VIEW tmp_source AS + SELECT * FROM (VALUES + (1, parse_json('{"val": 123.45}')), + (2, parse_json('{"val": 678.90}')), + (3, parse_json('{"val": 999.99}')), + (4, parse_json('{"val": 123456.78}')), + (5, parse_json('{"val": 1.2345}')), + (6, parse_json('{"val": 12.3}')) + ) AS t(id, address) + """); sql("INSERT INTO %s SELECT id, address FROM tmp_source ORDER BY id", TABLE_NAME); @@ -731,7 +860,12 @@ public void testDecimalFallbackAfterBuffer() throws IOException { List rows = sql( - "SELECT id, CAST(json_value(address, '$.val') AS DECIMAL(10, 4)) FROM %s ORDER BY id", + """ + SELECT id, + CAST(json_value(address, '$.val') AS DECIMAL(10, 4)) + FROM %s + ORDER BY id + """, TABLE_NAME); assertThat(rows).hasSize(6); assertThat(rows.get(0).getField(1)).isEqualTo(new BigDecimal("123.4500")); From cee4e353cd8d3871793fb9ed88998790289eeffa Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 15 May 2026 22:59:31 +0800 Subject: [PATCH 10/14] Update the Flink config to prioritize the table-level setting. --- .../main/java/org/apache/iceberg/flink/FlinkWriteOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 71e3800131cc..1fdd6df8d753 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -110,5 +110,5 @@ private FlinkWriteOptions() {} ConfigOptions.key("shred-variants").booleanType().defaultValue(false); public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = - ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(100); + ConfigOptions.key("variant-inference-buffer-size").intType().noDefaultValue(); } From d87d3cadb654850cc8f58f9935fd765acbed564b Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 15 May 2026 23:11:40 +0800 Subject: [PATCH 11/14] rename to unused --- .../java/org/apache/iceberg/spark/source/SparkFormatModels.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index ae221b01ff42..7277b558b1f3 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -53,7 +53,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - structType -> InternalRow::copy)); + unused -> InternalRow::copy)); FormatModelRegistry.register( ParquetFormatModel.create( From 03fb902d30a63575ad85d28a50ea2a614d675778 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 20 May 2026 09:55:22 +0800 Subject: [PATCH 12/14] rename to unused for spark4.0 --- .../java/org/apache/iceberg/spark/source/SparkFormatModels.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index ae221b01ff42..7277b558b1f3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -53,7 +53,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - structType -> InternalRow::copy)); + unused -> InternalRow::copy)); FormatModelRegistry.register( ParquetFormatModel.create( From 9480782d0ec0fcb03be943b5008a499e803cecae Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 20 May 2026 10:30:41 +0800 Subject: [PATCH 13/14] Add new create method in ParquetFormatModel --- .../iceberg/flink/data/FlinkFormatModels.java | 5 ++++- .../iceberg/parquet/ParquetFormatModel.java | 21 +++++++++++++++++++ .../parquet/TestParquetDataWriter.java | 8 ++++--- .../spark/source/SparkFormatModels.java | 4 +++- .../spark/source/SparkFormatModels.java | 4 +++- 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java index 6dd7c3e14204..747a4618682c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.data; +import java.util.function.Function; +import java.util.function.UnaryOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.RowType; @@ -36,7 +38,8 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new FlinkVariantShreddingAnalyzer(), - rowType -> new RowDataSerializer(rowType)::copy)); + (Function>) + rowType -> new RowDataSerializer(rowType)::copy)); FormatModelRegistry.register( AvroFormatModel.create( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java index 46db3f3bbdce..fae128ba893a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java @@ -67,6 +67,27 @@ public static ParquetFormatModel> create( type, schemaType, writerFunction, readerFunction, false, null, null); } + /** + * @deprecated Will be removed in 1.12.0; use {@link #create(Class, Class, WriterFunction, + * ReaderFunction, VariantShreddingAnalyzer, Function)} instead. + */ + @Deprecated + public static ParquetFormatModel> create( + Class type, + Class schemaType, + WriterFunction, S, MessageType> writerFunction, + ReaderFunction, S, MessageType> readerFunction, + VariantShreddingAnalyzer variantAnalyzer, + UnaryOperator copyFunc) { + return create( + type, + schemaType, + writerFunction, + readerFunction, + variantAnalyzer, + (Function>) unused -> copyFunc); + } + public static ParquetFormatModel> create( Class type, Class schemaType, diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index fac01e945800..547ada3e5355 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -26,6 +26,8 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.function.Function; +import java.util.function.UnaryOperator; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; @@ -370,7 +372,7 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), testAnalyzer, - unused -> oriRecord -> oriRecord); + (Function>) unused -> record -> record); try (FileAppender appender = model @@ -401,7 +403,7 @@ public void testWriteBuilderReturnsDirectAppenderWithNullAnalyzer() throws IOExc (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), null, - null); + (Function>) null); try (FileAppender appender = model @@ -471,7 +473,7 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { (icebergSchema, fileSchema, engineSchema, idToConstant) -> GenericParquetReaders.buildReader(icebergSchema, fileSchema), analyzer, - unused -> oriRecord -> oriRecord); + (Function>) unused -> record1 -> record1); try (FileAppender appender = model diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 7277b558b1f3..15c96ff4cd73 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.source; +import java.util.function.Function; +import java.util.function.UnaryOperator; import org.apache.iceberg.avro.AvroFormatModel; import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.orc.ORCFormatModel; @@ -53,7 +55,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - unused -> InternalRow::copy)); + (Function>) unused -> InternalRow::copy)); FormatModelRegistry.register( ParquetFormatModel.create( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 7277b558b1f3..15c96ff4cd73 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.source; +import java.util.function.Function; +import java.util.function.UnaryOperator; import org.apache.iceberg.avro.AvroFormatModel; import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.orc.ORCFormatModel; @@ -53,7 +55,7 @@ public static void register() { (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), new SparkVariantShreddingAnalyzer(), - unused -> InternalRow::copy)); + (Function>) unused -> InternalRow::copy)); FormatModelRegistry.register( ParquetFormatModel.create( From 60ccdbe2eb29d0675f9c05f30c72716127e06205 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Thu, 21 May 2026 14:29:33 +0800 Subject: [PATCH 14/14] Adjust deprecated doc and add check for copyFuncFactory --- .../java/org/apache/iceberg/parquet/ParquetFormatModel.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java index fae128ba893a..9a4a62cae612 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java @@ -68,7 +68,7 @@ public static ParquetFormatModel> create( } /** - * @deprecated Will be removed in 1.12.0; use {@link #create(Class, Class, WriterFunction, + * @deprecated Will be removed in 1.13.0; use {@link #create(Class, Class, WriterFunction, * ReaderFunction, VariantShreddingAnalyzer, Function)} instead. */ @Deprecated @@ -289,6 +289,7 @@ public FileAppender build() throws IOException { * top-level fields. */ private FileAppender buildShreddedAppender() { + Preconditions.checkState(copyFuncFactory != null, "copyFuncFactory must not be null"); UnaryOperator copyFunc = copyFuncFactory.apply(engineSchema); Preconditions.checkState(copyFunc != null, "copyFunc must not return null");