Skip to content

Comments

Core, Data, Spark: Moving Spark to use the new FormatModel API#15328

Merged
pvary merged 7 commits intoapache:mainfrom
pvary:spark_model
Feb 18, 2026
Merged

Core, Data, Spark: Moving Spark to use the new FormatModel API#15328
pvary merged 7 commits intoapache:mainfrom
pvary:spark_model

Conversation

@pvary
Copy link
Contributor

@pvary pvary commented Feb 15, 2026

Part of: #12298
Implementation of the new API: #12774

SparkFormatModel and related changes

@singhpk234
Copy link
Contributor

singhpk234 commented Feb 16, 2026

can we run the benchmarks for spark to see how the benchmarks turns out to be post these : https://github.com/apache/iceberg/tree/main/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark ?

@pvary
Copy link
Contributor Author

pvary commented Feb 16, 2026

can we run the benchmarks for spark to see how the benchmarks turns out to be post these : https://github.com/apache/iceberg/tree/main/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark ?

Added some new tests for Parquet (readUsingRegistryReader, readWithProjectionUsingRegistryReader, readUsingRegistryReader, readWithProjectionUsingRegistryReader, writeUsingRegistryWriter, writeUsingRegistryWriter):

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReader                          ss    5  0.311 ± 0.005   s/op
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReaderUnsafe                    ss    5  0.396 ± 0.018   s/op
SparkParquetReadersFlatDataBenchmark.readUsingRegistryReader                         ss    5  0.326 ± 0.049   s/op
SparkParquetReadersFlatDataBenchmark.readUsingSparkReader                            ss    5  0.408 ± 0.008   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReader            ss    5  0.185 ± 0.018   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe      ss    5  0.363 ± 0.018   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingRegistryReader           ss    5  0.213 ± 0.026   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingSparkReader              ss    5  0.273 ± 0.019   s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReader                        ss    5  0.184 ± 0.018   s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReaderUnsafe                  ss    5  0.219 ± 0.026   s/op
SparkParquetReadersNestedDataBenchmark.readUsingRegistryReader                       ss    5  0.179 ± 0.035   s/op
SparkParquetReadersNestedDataBenchmark.readUsingSparkReader                          ss    5  0.223 ± 0.015   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReader          ss    5  0.077 ± 0.010   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe    ss    5  0.137 ± 0.007   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingRegistryReader         ss    5  0.080 ± 0.006   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingSparkReader            ss    5  0.103 ± 0.003   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter                         ss    5  2.602 ± 0.064   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingRegistryWriter                        ss    5  2.593 ± 0.074   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter                           ss    5  2.594 ± 0.054   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter                       ss    5  1.559 ± 0.022   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingRegistryWriter                      ss    5  1.569 ± 0.043   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter                         ss    5  1.595 ± 0.046   s/op

The differences are barely noticeable in any direction. There should not be any real difference as the resulting readers and writers are using the same code.

.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with this since it isn't production code. It's unlikely that this is going to cause problems cherry-picking.

.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we file an issue to track the underlying Spark 4.1 test failure, so we can fix the root cause later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both "true" and "false" is ok as well. The issue was that the config was not set.

@kevinjqliu
Copy link
Contributor

we mentioned this PR in the Iceberg <> Spark community sync today. Would be great to get some more 👀 on it!


@Benchmark
@Threads(1)
public void readUsingRegistryReader(Blackhole blackHole) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test the direct method vs the registry method? I would expect this to replace the current readUsingIcebergReaderUnsafe implementation since this is the same reader implementation. We should make sure that there is not a regression by running these benchmarks (for which it would be fine to leave this method here) but I don't want to accumulate essentially dead code testing the same thing.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. I'd prefer to update the benchmark cases since they use the same readers/writers, but that's fairly minor.

Comment on lines +84 to 101
@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(
Schema icebergSchema, MessageType type, StructType dfSchema) {
return (ParquetValueWriter<T>)
ParquetWithSparkSchemaVisitor.visit(
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
type,
new WriteBuilder(type));
}

public static <T> ParquetValueWriter<T> buildWriter(
StructType dfSchema, MessageType type, Schema icebergSchema) {
return (ParquetValueWriter<T>)
ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type));
ParquetWithSparkSchemaVisitor.visit(
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
type,
new WriteBuilder(type));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it intentional here to have 2 functions with different signature ordering? might be confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve given this quite a bit of thought. On the caller side we use the ordericebergSchema, fileSchema, engineSchema, and I believe this is the most logical ordering. If anyone feels strongly otherwise, I’m happy to adjust it.

Comment on lines +215 to +218
.metricsConfig(metricsConfig)
.withPartition(partition)
.overwrite()
.metricsConfig(metricsConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.metricsConfig(metricsConfig)
.withPartition(partition)
.overwrite()
.metricsConfig(metricsConfig)
.metricsConfig(metricsConfig)
.withPartition(partition)
.overwrite()

called twice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have seen this too late.
Created another PR: #15356

this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = null;
this.table = table;
this.format = dataFileFormat;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.format = dataFileFormat;
this.format = deleteFileFormat;

this.format is used in newPositionDeleteWriter, so i think it should be the delete file format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the fix to #15356

this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;
this.table = table;
this.format = dataFileFormat;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.format = dataFileFormat;
this.format = deleteFileFormat;

this.format is used in newPositionDeleteWriter, so i think it should be the delete file format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the fix to #15356

@pvary pvary merged commit dde0dc2 into apache:main Feb 18, 2026
32 checks passed
@pvary pvary deleted the spark_model branch February 18, 2026 15:10
@pvary
Copy link
Contributor Author

pvary commented Feb 18, 2026

Merged to main.
Thanks @huaxingao, @kevinjqliu, @rdblue for the reviews!

@kevinjqliu
Copy link
Contributor

Note that #15356 contains a follow up fix to address the comments above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants