Skip to content

Flink: Support writing shredded variant in Flink#15596

Open
Guosmilesmile wants to merge 13 commits into
apache:mainfrom
Guosmilesmile:flink_shredded_varisnt_fileformat
Open

Flink: Support writing shredded variant in Flink#15596
Guosmilesmile wants to merge 13 commits into
apache:mainfrom
Guosmilesmile:flink_shredded_varisnt_fileformat

Conversation

@Guosmilesmile
Copy link
Copy Markdown
Contributor

@Guosmilesmile Guosmilesmile commented Mar 12, 2026

This PR is mainly to add support in Flink for writing shredding-variant data to Iceberg tables, based on #14297.

This PR is based on #14297 and will be adjusted in sync with it.

@Guosmilesmile Guosmilesmile marked this pull request as draft March 12, 2026 08:09
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 15ff223 to 5b448b9 Compare March 12, 2026 08:22
@github-actions github-actions Bot removed the ORC label Mar 12, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 2 times, most recently from 8f6198a to b03caf6 Compare March 12, 2026 08:59
@Guosmilesmile Guosmilesmile changed the title Core,Flink: Support writing shredded variant in Flink Flink: Support writing shredded variant in Flink Mar 12, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 3 times, most recently from 88045e1 to cbfa8c2 Compare March 13, 2026 07:17
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from fae2814 to f3a2fba Compare March 24, 2026 05:50
@github-actions github-actions Bot added the core label Mar 24, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 4 times, most recently from b07b00b to c95d78f Compare March 24, 2026 08:36
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 3 times, most recently from fc8c45a to b116f25 Compare April 1, 2026 01:50
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from b116f25 to 650cb7a Compare April 10, 2026 09:42
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 770d9c4 to 7d48389 Compare May 7, 2026 06:55
@Guosmilesmile Guosmilesmile marked this pull request as ready for review May 7, 2026 08:34
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Hi @aihuaxu @nssalian @pvary @mxm . Since the Spark part has been merged, the Flink part has been adjusted accordingly. If you have time, please help review it.

Thanks!
GuoYu.

Comment on lines +270 to +271
.tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
.defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will we handle when ORC supports shredding variants?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch . I rename shred-variants to parquet-shred-variants to clarify this feature is only support parquet . If orc support this, then we can add another config.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do parquet for now since we followed that pattern for the Spark implementation.

FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new FlinkVariantShreddingAnalyzer(),
(row, rowType) -> new RowDataSerializer(rowType).copy(row)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this costly to recreate every time when we copy a row?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will increase the cost, but without copying, there would be issues with data corruption when buffer data. We ran into this during early development, and the unit tests can reproduce it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the RowDataSerializer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use Function<S, UnaryOperator<D>> instead of BiFunction<D, S, D> to implement this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 63ae5ae to 0f2ae10 Compare May 9, 2026 05:33
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 2 times, most recently from 8bf41e8 to b373680 Compare May 11, 2026 02:55

@Override
protected int resolveColumnIndex(RowType flinkSchema, String columnName) {
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant try catch. The catch is unreachable; the whole method should just be return
flinkSchema.getFieldIndex(columnName); I checked RowType code it does not throw IllegalArgumentException. It returns -1 https://github.com/apache/flink/blob/release-2.1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java#L187

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Catche, remove try catch now.


String variantNullAbleTableName = "test_all_null_variant_column";
sql(
"CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses wrong settings shred-variants and shredding is never enabled. The test then asserts an unshredded schema and passes for the wrong reason.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. The config was wrong, I've already updated it. The tests passed because we were inserting all nulls, so the result should be identical with or without shredding enabled. The whole point here was just to verify that shredding handles nulls correctly. Good catch, thanks!

Comment thread docs/docs/flink-configuration.md Outdated
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
| uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table |
| parquet-shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both new rows are missing the trailing |

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!Add it.

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 0e376f4 to 4484b2b Compare May 15, 2026 08:19
Comment thread docs/docs/flink-configuration.md
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Guosmilesmile commented May 15, 2026

I found these configs in Spark:

spark.sql.iceberg.shred-variants
spark.sql.iceberg.variant-inference-buffer-size
shred-variants
variant-inference-buffer-size

All of them already hide the format-related info, while TableProperties keeps the parquet prefix. So I've adjusted the configs on our side to align with Spark's approach — Flink now uses shred-variants, which overrides the table-level configs write.parquet.shred-variants and write.orc.shred-variants (if/when ORC supports it).

On the code side, we can implement it like this:

public boolean parquetShredVariants() {
    return confParser
        .booleanConf()
        .option(FlinkWriteOptions.SHRED_VARIANTS.key())
        .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
        .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
        .parse();
  }

public boolean orcShredVariants() {
    return confParser
        .booleanConf()
        .option(FlinkWriteOptions.SHRED_VARIANTS.key())
        .tableProperty(TableProperties.ORC_SHRED_VARIANTS)
        .defaultValue(TableProperties.ORC_SHRED_VARIANTS_DEFAULT)
        .parse();
  }

@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Hi @pvary @talatuyarer @nssalian , I'd be grateful if you could take a look when you have time.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 19, 2026

I don't have more comments.
Anybody else?

SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new SparkVariantShreddingAnalyzer(),
InternalRow::copy));
structType -> InternalRow::copy));
Copy link
Copy Markdown
Contributor

@nssalian nssalian May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Guosmilesmile could you update the Spark 4.0 with the above suggestion as well

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from d0fe6b7 to 03fb902 Compare May 20, 2026 02:00
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

@pvary @talatuyarer @nssalian @aihuaxu Hey all, I rebased main but ran into some CI failures. Looks like a new check was added recently that doesn't allow modifying the ParquetFormatModel parameter types directly.

As a workaround, I added a new method createWithCopyFuncFactory in ParquetFormatModel. The original create method now delegates to it, so the Spark code stays untouched, while FlinkFormatModels calls createWithCopyFuncFactory explicitly.

Would really appreciate it if you could help take another look at these changes. Thanks a lot!

java.method.parameterTypeChanged: The type of the parameter changed from 'java.util.function.UnaryOperator<D extends java.lang.Object>' to 'java.util.function.Function<S extends java.lang.Object, java.util.function.UnaryOperator<D extends java.lang.Object>>'.

old: parameter <D, S> org.apache.iceberg.parquet.ParquetFormatModel<D, S, org.apache.iceberg.parquet.ParquetValueReader<?>> org.apache.iceberg.parquet.ParquetFormatModel<D, S, R>::create(java.lang.Class<D>, java.lang.Class<S>, org.apache.iceberg.formats.BaseFormatModel.WriterFunction<org.apache.iceberg.parquet.ParquetValueWriter<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.formats.BaseFormatModel.ReaderFunction<org.apache.iceberg.parquet.ParquetValueReader<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.parquet.VariantShreddingAnalyzer<D, S>, ===java.util.function.UnaryOperator<D>===)
new: parameter <D, S> org.apache.iceberg.parquet.ParquetFormatModel<D, S, org.apache.iceberg.parquet.ParquetValueReader<?>> org.apache.iceberg.parquet.ParquetFormatModel<D, S, R>::create(java.lang.Class<D>, java.lang.Class<S>, org.apache.iceberg.formats.BaseFormatModel.WriterFunction<org.apache.iceberg.parquet.ParquetValueWriter<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.formats.BaseFormatModel.ReaderFunction<org.apache.iceberg.parquet.ParquetValueReader<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.parquet.VariantShreddingAnalyzer<D, S>, ===java.util.function.Function<S, java.util.function.UnaryOperator<D>>===)

https://github.com/apache/iceberg/actions/runs/26136650474/job/76873207059?pr=15596

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 0cdb80e to e8ad73e Compare May 20, 2026 07:45
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from e8ad73e to 9480782 Compare May 20, 2026 07:50
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

After discussing with @pvary , we decided to keep the same create name and go with deprecating the old method instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants