Flink: Support writing shredded variant in Flink#15596
Conversation
15ff223 to
5b448b9
Compare
8f6198a to
b03caf6
Compare
88045e1 to
cbfa8c2
Compare
fae2814 to
f3a2fba
Compare
b07b00b to
c95d78f
Compare
fc8c45a to
b116f25
Compare
b116f25 to
650cb7a
Compare
770d9c4 to
7d48389
Compare
| .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) | ||
| .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) |
There was a problem hiding this comment.
How will we handle when ORC supports shredding variants?
There was a problem hiding this comment.
Good catch . I rename shred-variants to parquet-shred-variants to clarify this feature is only support parquet . If orc support this, then we can add another config.
There was a problem hiding this comment.
Let's do parquet for now since we followed that pattern for the Spark implementation.
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); | ||
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new FlinkVariantShreddingAnalyzer(), | ||
| (row, rowType) -> new RowDataSerializer(rowType).copy(row))); |
There was a problem hiding this comment.
Isn't this costly to recreate every time when we copy a row?
There was a problem hiding this comment.
It will increase the cost, but without copying, there would be issues with data corruption when buffer data. We ran into this during early development, and the unit tests can reproduce it.
There was a problem hiding this comment.
Can we reuse the RowDataSerializer?
There was a problem hiding this comment.
With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.
There was a problem hiding this comment.
Yes, we can use Function<S, UnaryOperator<D>> instead of BiFunction<D, S, D> to implement this.
There was a problem hiding this comment.
+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.
63ae5ae to
0f2ae10
Compare
8bf41e8 to
b373680
Compare
|
|
||
| @Override | ||
| protected int resolveColumnIndex(RowType flinkSchema, String columnName) { | ||
| try { |
There was a problem hiding this comment.
This is redundant try catch. The catch is unreachable; the whole method should just be return
flinkSchema.getFieldIndex(columnName); I checked RowType code it does not throw IllegalArgumentException. It returns -1 https://github.com/apache/flink/blob/release-2.1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java#L187
There was a problem hiding this comment.
Good Catche, remove try catch now.
|
|
||
| String variantNullAbleTableName = "test_all_null_variant_column"; | ||
| sql( | ||
| "CREATE TABLE %s (id int NOT NULL, address variant) with ('write.format.default'='%s','format-version'='3','shred-variants'='true','variant-inference-buffer-size'='10')", |
There was a problem hiding this comment.
This test uses wrong settings shred-variants and shredding is never enabled. The test then asserts an unshredded schema and passes for the wrong reason.
There was a problem hiding this comment.
Yeah, you're right. The config was wrong, I've already updated it. The tests passed because we were inserting all nulls, so the result should be identical with or without shredding enabled. The whole point here was just to verify that shredding handles nulls correctly. Good catch, thanks!
| | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | | ||
| | write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | | ||
| | uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table | | ||
| | parquet-shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write |
There was a problem hiding this comment.
Both new rows are missing the trailing |
0e376f4 to
4484b2b
Compare
|
I found these configs in Spark:
All of them already hide the format-related info, while TableProperties keeps the parquet prefix. So I've adjusted the configs on our side to align with Spark's approach — Flink now uses shred-variants, which overrides the table-level configs write.parquet.shred-variants and write.orc.shred-variants (if/when ORC supports it). On the code side, we can implement it like this: |
|
Hi @pvary @talatuyarer @nssalian , I'd be grateful if you could take a look when you have time. |
|
I don't have more comments. |
| SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new SparkVariantShreddingAnalyzer(), | ||
| InternalRow::copy)); | ||
| structType -> InternalRow::copy)); |
There was a problem hiding this comment.
@Guosmilesmile could you update the Spark 4.0 with the above suggestion as well
d0fe6b7 to
03fb902
Compare
|
@pvary @talatuyarer @nssalian @aihuaxu Hey all, I rebased main but ran into some CI failures. Looks like a new check was added recently that doesn't allow modifying the ParquetFormatModel parameter types directly. As a workaround, I added a new method createWithCopyFuncFactory in ParquetFormatModel. The original create method now delegates to it, so the Spark code stays untouched, while FlinkFormatModels calls createWithCopyFuncFactory explicitly. Would really appreciate it if you could help take another look at these changes. Thanks a lot! https://github.com/apache/iceberg/actions/runs/26136650474/job/76873207059?pr=15596 |
0cdb80e to
e8ad73e
Compare
e8ad73e to
9480782
Compare
|
After discussing with @pvary , we decided to keep the same create name and go with deprecating the old method instead. |
This PR is mainly to add support in Flink for writing shredding-variant data to Iceberg tables, based on #14297.
This PR is based on #14297 and will be adjusted in sync with it.