Core, Data, Spark: Moving Spark to use the new FormatModel API#15328
Core, Data, Spark: Moving Spark to use the new FormatModel API#15328pvary merged 7 commits intoapache:mainfrom
Conversation
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
Outdated
Show resolved
Hide resolved
|
can we run the benchmarks for spark to see how the benchmarks turns out to be post these : https://github.com/apache/iceberg/tree/main/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark ? |
Added some new tests for Parquet ( The differences are barely noticeable in any direction. There should not be any real difference as the resulting readers and writers are using the same code. |
| .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") | ||
| .set("spark.sql.caseSensitive", "false") | ||
| .set("spark.sql.parquet.fieldId.write.enabled", "false") | ||
| .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") |
There was a problem hiding this comment.
These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.
There was a problem hiding this comment.
I'm okay with this since it isn't production code. It's unlikely that this is going to cause problems cherry-picking.
| .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") | ||
| .set("spark.sql.caseSensitive", "false") | ||
| .set("spark.sql.parquet.fieldId.write.enabled", "false") | ||
| .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") |
There was a problem hiding this comment.
These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.
There was a problem hiding this comment.
Should we file an issue to track the underlying Spark 4.1 test failure, so we can fix the root cause later?
There was a problem hiding this comment.
I think both "true" and "false" is ok as well. The issue was that the config was not set.
...c/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
Outdated
Show resolved
Hide resolved
|
we mentioned this PR in the Iceberg <> Spark community sync today. Would be great to get some more 👀 on it! |
|
|
||
| @Benchmark | ||
| @Threads(1) | ||
| public void readUsingRegistryReader(Blackhole blackHole) throws IOException { |
There was a problem hiding this comment.
Do we need to test the direct method vs the registry method? I would expect this to replace the current readUsingIcebergReaderUnsafe implementation since this is the same reader implementation. We should make sure that there is not a regression by running these benchmarks (for which it would be fine to leave this method here) but I don't want to accumulate essentially dead code testing the same thing.
...src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
Outdated
Show resolved
Hide resolved
...k/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
rdblue
left a comment
There was a problem hiding this comment.
Looks good overall. I'd prefer to update the benchmark cases since they use the same readers/writers, but that's fairly minor.
| @SuppressWarnings("unchecked") | ||
| public static <T> ParquetValueWriter<T> buildWriter( | ||
| Schema icebergSchema, MessageType type, StructType dfSchema) { | ||
| return (ParquetValueWriter<T>) | ||
| ParquetWithSparkSchemaVisitor.visit( | ||
| dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), | ||
| type, | ||
| new WriteBuilder(type)); | ||
| } | ||
|
|
||
| public static <T> ParquetValueWriter<T> buildWriter( | ||
| StructType dfSchema, MessageType type, Schema icebergSchema) { | ||
| return (ParquetValueWriter<T>) | ||
| ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type)); | ||
| ParquetWithSparkSchemaVisitor.visit( | ||
| dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), | ||
| type, | ||
| new WriteBuilder(type)); | ||
| } |
There was a problem hiding this comment.
is it intentional here to have 2 functions with different signature ordering? might be confusing
There was a problem hiding this comment.
I’ve given this quite a bit of thought. On the caller side we use the ordericebergSchema, fileSchema, engineSchema, and I believe this is the most logical ordering. If anyone feels strongly otherwise, I’m happy to adjust it.
| .metricsConfig(metricsConfig) | ||
| .withPartition(partition) | ||
| .overwrite() | ||
| .metricsConfig(metricsConfig) |
There was a problem hiding this comment.
| .metricsConfig(metricsConfig) | |
| .withPartition(partition) | |
| .overwrite() | |
| .metricsConfig(metricsConfig) | |
| .metricsConfig(metricsConfig) | |
| .withPartition(partition) | |
| .overwrite() |
called twice
There was a problem hiding this comment.
Sorry, I have seen this too late.
Created another PR: #15356
| this.equalityDeleteSparkType = equalityDeleteSparkType; | ||
| this.positionDeleteSparkType = null; | ||
| this.table = table; | ||
| this.format = dataFileFormat; |
There was a problem hiding this comment.
| this.format = dataFileFormat; | |
| this.format = deleteFileFormat; |
this.format is used in newPositionDeleteWriter, so i think it should be the delete file format
| this.equalityDeleteSparkType = equalityDeleteSparkType; | ||
| this.positionDeleteSparkType = positionDeleteSparkType; | ||
| this.table = table; | ||
| this.format = dataFileFormat; |
There was a problem hiding this comment.
| this.format = dataFileFormat; | |
| this.format = deleteFileFormat; |
this.format is used in newPositionDeleteWriter, so i think it should be the delete file format
|
Merged to main. |
|
Note that #15356 contains a follow up fix to address the comments above |
Part of: #12298
Implementation of the new API: #12774
SparkFormatModel and related changes