Skip to content

fix: materialize ConstantColumnVector on Comet's serialize/export paths#4532

Open
schenksj wants to merge 7 commits into
apache:mainfrom
schenksj:fix/materialize-constant-column-vector
Open

fix: materialize ConstantColumnVector on Comet's serialize/export paths#4532
schenksj wants to merge 7 commits into
apache:mainfrom
schenksj:fix/materialize-constant-column-vector

Conversation

@schenksj

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4527.

Rationale for this change

Spark wraps file-source partition columns and other per-batch constants in ConstantColumnVector. When such a batch reaches Comet's serialization path (Utils.getBatchFieldVectors, used by broadcast/shuffle) or the FFI export path (NativeUtil.exportBatch), it was rejected with:

Comet execution only takes Arrow Arrays, but got ...ConstantColumnVector

This is a standalone fix; it was surfaced while working on the Delta Lake contrib integration (the OPTIMIZE / deletion-vector rewrite paths pull constants through a Comet operator), so prioritizing it helps that effort, but it applies to any plan that routes a constant column through a Comet operator.

What changes are included in this PR?

  • ConstantColumnVectors.materialize (in the org.apache.spark.sql.comet.execution.arrow package) builds a fresh Arrow FieldVector holding the constant repeated numRows times. It reuses the existing per-type ArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and complex struct/array/map -- and stays in sync with Spark's type handling, rather than a hand-rolled per-type switch.
  • Utils.materializeConstantColumnVector exposes it to the serialization path.
  • New match arms in Utils.getBatchFieldVectors and NativeUtil.exportBatch materialize a ConstantColumnVector instead of throwing. The existing CometVector path is untouched.

How are these changes tested?

New test in UtilsSuite round-trips a batch with a value ConstantColumnVector and a null ConstantColumnVector through serializeBatches / decodeBatches and asserts the materialized values (and nulls) survive. The test fails on main with the "only takes Arrow Arrays" exception and passes with this change. UtilsSuite (3/3) and CometExecSuite (126/0) pass. The FFI exportBatch arm shares the same materializeConstantColumnVector helper.

schenksj and others added 3 commits May 29, 2026 23:10
Spark wraps file-source partition columns and other per-batch constants in
ConstantColumnVector. When such a batch reaches Comet's serialization path
(Utils.getBatchFieldVectors, used by broadcast/shuffle) or FFI export path
(NativeUtil.exportBatch), it was rejected with "Comet execution only takes
Arrow Arrays".

Materialize the constant into a fresh Arrow FieldVector (the constant repeated
numRows times) inline. The materializer reuses the existing per-type
ArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and
complex struct/array/map -- and stays in sync with Spark's type handling.

Adds ConstantColumnVectors.materialize (arrow package) +
Utils.materializeConstantColumnVector, with new match arms in
getBatchFieldVectors and exportBatch.

Closes apache#4527

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@schenksj

Copy link
Copy Markdown
Contributor Author

Gentle ping — still looking for a first review on this one. Small, self-contained fix. Thanks!

@andygrove

Copy link
Copy Markdown
Member

A few observations from a review pass. Acknowledging up front that this comment was drafted with help from AI (Claude Code).

The reuse of the existing per-type ArrowFieldWriters is exactly the right design choice — every Spark type comes along for free and the logic stays in sync with non-constant batches.

A few notes:

  1. Hardcoded "UTC" timezone in Utils.materializeConstantColumnVector. The choice is defensible since Spark stores TimestampType as micros in UTC, but elsewhere in this file toArrowSchema requires callers to pass the session timezone explicitly. If a batch flows through getBatchFieldVectors containing both a ConstantColumnVector of TimestampType and a non-constant timestamp column, would they end up with mismatched Arrow timezone metadata? Worth either threading a timeZoneId parameter through getBatchFieldVectors (sourced from SQLConf.sessionLocalTimeZone), or adding a comment explaining why "UTC" is safe in this serialization context.

  2. Test coverage in UtilsSuite. Nice round-trip test. Would you mind adding a case or two beyond IntegerType? A nullable StructType or ArrayType constant exercises a different ArrowFieldWriter code path (via getStruct(rowId) / getArray(rowId) on ConstantColumnVector), and a TimestampType constant would pin down the timezone choice in case anyone later changes it. The materializer claims to support every type and it'd be useful to prove it for at least the complex case.

  3. FFI exportBatch arm. It shares materializeConstantColumnVector with getBatchFieldVectors, so the materializer itself is covered. The surrounding code (Data.exportVector + the allocator handoff) isn't exercised by the new test. If easy, a small smoke test that runs a ConstantColumnVector-containing batch through an actual Comet operator path would catch a regression in the FFI wiring. Optional.

  4. CI flake. The one failing check (Spark 4.1 sql_core-3) annotates as The hosted runner lost communication with the server — infrastructure, not the PR. The matching sql_core-3 job on Spark 3.5 passes. A re-run should clear it.

schenksj and others added 3 commits June 19, 2026 09:27
Addresses review feedback on the ConstantColumnVector materialization:

- Expand the comment on the hardcoded "UTC" zone in
  materializeConstantColumnVector to explain why UTC is correct rather than
  the session-local timezone: Comet's non-constant TimestampType columns are
  Arrow vectors exported from native execution tagged Timestamp(us, "UTC"), so
  materialising the constant as UTC keeps its Arrow field metadata consistent
  with its sibling timestamp columns in the same VectorSchemaRoot. Threading
  the session-local timezone would instead introduce the mismatch.

- Add a TimestampType constant round-trip test (pins the UTC choice) and a
  nullable StructType constant test (exercises the getStruct(rowId)/getChild
  ArrowFieldWriter path, including a null nested field and a wholly-null
  struct) beyond the existing IntegerType case.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds NativeUtilSuite covering the ConstantColumnVector arm of
NativeUtil.exportBatch: a batch carrying Spark ConstantColumnVectors is
exported across the Arrow C Data Interface and imported back, exercising
materializeConstantColumnVector + Data.exportVector + the allocator handoff
that the serializeBatches test does not reach. Includes scalar (value + null)
and a nullable struct constant to cover the complex-type FFI path.

Mirrors the export/import round trip NativeUtil.getNextBatch performs in
production. Verified red/green: removing the ConstantColumnVector arm in
exportBatch makes the test fail with "Comet execution only takes Arrow Arrays".

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@schenksj

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review, @andygrove! All four points are addressed. (Disclosure: this PR's changes and this response were written with help from AI — Claude Code.)

1. Hardcoded "UTC" timezone. I went with the comment route, because on investigation "UTC" turns out to be the correct choice here rather than just a defensible one — and threading the session-local timezone would actively introduce the mismatch you were worried about. These constants are materialized alongside the batch's non-constant columns in the same VectorSchemaRoot, and Comet's non-constant TimestampType columns are Arrow vectors exported from native execution, where Comet always tags them Timestamp(µs, "UTC") (see native serde.rs). Spark itself stores TimestampType as micros in UTC, so the constant's value is already a UTC instant. Tagging the materialized constant "UTC" keeps its Arrow timezone metadata consistent with its sibling timestamp columns; threading SQLConf.sessionLocalTimeZone would make them diverge. I've expanded the comment in materializeConstantColumnVector to spell this out.

2. Test coverage in UtilsSuite. Added two cases beyond IntegerType:

  • A nullable StructType constant (with a null nested field and a wholly-null struct) — exercises the getStruct(rowId) / getChild(ordinal) ArrowFieldWriter path.
  • A TimestampType constant — round-trips the micros and pins down the "UTC" choice, guarding against anyone later swapping the zone argument.

3. FFI exportBatch arm. Done — added NativeUtilSuite, which runs a ConstantColumnVector-containing batch (scalar value, null, and a struct constant) through the actual Arrow C Data Interface via NativeUtil.exportBatchToAddresses + importVector — the same export/import round trip getNextBatch performs in production, so it covers Data.exportVector + the allocator handoff. I verified it's a real guard: removing the ConstantColumnVector arm in exportBatch makes it fail with Comet execution only takes Arrow Arrays.

4. CI flake. Agreed — the Spark 4.1 / sql_core-3 failure was the runner-setup infra flake. I've also merged latest main into the branch (it was out of date with base), so a fresh CI run picks all of this up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ConstantColumnVector inputs fail Comet export with "Comet execution only takes Arrow Arrays"

2 participants