fix: materialize ConstantColumnVector on Comet's serialize/export paths#4532
fix: materialize ConstantColumnVector on Comet's serialize/export paths#4532schenksj wants to merge 7 commits into
Conversation
Spark wraps file-source partition columns and other per-batch constants in ConstantColumnVector. When such a batch reaches Comet's serialization path (Utils.getBatchFieldVectors, used by broadcast/shuffle) or FFI export path (NativeUtil.exportBatch), it was rejected with "Comet execution only takes Arrow Arrays". Materialize the constant into a fresh Arrow FieldVector (the constant repeated numRows times) inline. The materializer reuses the existing per-type ArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and complex struct/array/map -- and stays in sync with Spark's type handling. Adds ConstantColumnVectors.materialize (arrow package) + Utils.materializeConstantColumnVector, with new match arms in getBatchFieldVectors and exportBatch. Closes apache#4527 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Gentle ping — still looking for a first review on this one. Small, self-contained fix. Thanks! |
|
A few observations from a review pass. Acknowledging up front that this comment was drafted with help from AI (Claude Code). The reuse of the existing per-type A few notes:
|
Addresses review feedback on the ConstantColumnVector materialization: - Expand the comment on the hardcoded "UTC" zone in materializeConstantColumnVector to explain why UTC is correct rather than the session-local timezone: Comet's non-constant TimestampType columns are Arrow vectors exported from native execution tagged Timestamp(us, "UTC"), so materialising the constant as UTC keeps its Arrow field metadata consistent with its sibling timestamp columns in the same VectorSchemaRoot. Threading the session-local timezone would instead introduce the mismatch. - Add a TimestampType constant round-trip test (pins the UTC choice) and a nullable StructType constant test (exercises the getStruct(rowId)/getChild ArrowFieldWriter path, including a null nested field and a wholly-null struct) beyond the existing IntegerType case. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…stant-column-vector
Adds NativeUtilSuite covering the ConstantColumnVector arm of NativeUtil.exportBatch: a batch carrying Spark ConstantColumnVectors is exported across the Arrow C Data Interface and imported back, exercising materializeConstantColumnVector + Data.exportVector + the allocator handoff that the serializeBatches test does not reach. Includes scalar (value + null) and a nullable struct constant to cover the complex-type FFI path. Mirrors the export/import round trip NativeUtil.getNextBatch performs in production. Verified red/green: removing the ConstantColumnVector arm in exportBatch makes the test fail with "Comet execution only takes Arrow Arrays". Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Thanks for the thorough review, @andygrove! All four points are addressed. (Disclosure: this PR's changes and this response were written with help from AI — Claude Code.) 1. Hardcoded 2. Test coverage in
3. FFI 4. CI flake. Agreed — the |
Which issue does this PR close?
Closes #4527.
Rationale for this change
Spark wraps file-source partition columns and other per-batch constants in
ConstantColumnVector. When such a batch reaches Comet's serialization path (Utils.getBatchFieldVectors, used by broadcast/shuffle) or the FFI export path (NativeUtil.exportBatch), it was rejected with:This is a standalone fix; it was surfaced while working on the Delta Lake contrib integration (the OPTIMIZE / deletion-vector rewrite paths pull constants through a Comet operator), so prioritizing it helps that effort, but it applies to any plan that routes a constant column through a Comet operator.
What changes are included in this PR?
ConstantColumnVectors.materialize(in theorg.apache.spark.sql.comet.execution.arrowpackage) builds a fresh ArrowFieldVectorholding the constant repeatednumRowstimes. It reuses the existing per-typeArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and complex struct/array/map -- and stays in sync with Spark's type handling, rather than a hand-rolled per-type switch.Utils.materializeConstantColumnVectorexposes it to the serialization path.Utils.getBatchFieldVectorsandNativeUtil.exportBatchmaterialize aConstantColumnVectorinstead of throwing. The existingCometVectorpath is untouched.How are these changes tested?
New test in
UtilsSuiteround-trips a batch with a valueConstantColumnVectorand a nullConstantColumnVectorthroughserializeBatches/decodeBatchesand asserts the materialized values (and nulls) survive. The test fails onmainwith the "only takes Arrow Arrays" exception and passes with this change.UtilsSuite(3/3) andCometExecSuite(126/0) pass. The FFIexportBatcharm shares the samematerializeConstantColumnVectorhelper.