Skip to content

[SPARK-56166][PYTHON] Use ArrowBatchTransformer.enforce_schema to replace column-wise type coercion logic#54967

Open
Yicong-Huang wants to merge 1 commit intoapache:masterfrom
Yicong-Huang:SPARK-56166/enforce-schema
Open

[SPARK-56166][PYTHON] Use ArrowBatchTransformer.enforce_schema to replace column-wise type coercion logic#54967
Yicong-Huang wants to merge 1 commit intoapache:masterfrom
Yicong-Huang:SPARK-56166/enforce-schema

Conversation

@Yicong-Huang
Copy link
Contributor

@Yicong-Huang Yicong-Huang commented Mar 23, 2026

What changes were proposed in this pull request?

Replace manual column-by-column type coercion with ArrowBatchTransformer.enforce_schema in three places:

  1. ArrowStreamArrowUDTFSerializer.apply_type_coercion in serializers.py
  2. ArrowStreamArrowUDFSerializer.create_batch in serializers.py
  3. scalar Arrow iter UDF process_results in worker.py

Also:

  • Add arrow_cast parameter to enforce_schema for strict type matching mode
  • Add KeyError handling in enforce_schema for missing columns with user-friendly error

Why are the changes needed?

These three places duplicated the same coerce-and-reassemble logic that enforce_schema already provides.

Does this PR introduce any user-facing change?

Error messages for type/schema mismatches in Arrow UDTFs are slightly changed to be consistent with other Arrow UDF error messages.

How was this patch tested?

Existing tests in test_arrow_udtf.py and test_arrow_udf_scalar.py.

Was this patch authored or co-authored using generative AI tooling?

No.

…lace manual type coercion logic

### What changes were proposed in this pull request?

Replace manual column-by-column type coercion with `ArrowBatchTransformer.enforce_schema` in three places:

1. `ArrowStreamArrowUDTFSerializer.apply_type_coercion` in serializers.py
2. `ArrowStreamArrowUDFSerializer.create_batch` in serializers.py
3. `process_results` in worker.py (scalar Arrow iter UDF path)

Also:
- Add `arrow_cast` parameter to `enforce_schema` for strict type matching mode
- Add `KeyError` handling in `enforce_schema` for missing columns with user-friendly error
- Remove now-unused `coerce_arrow_array` imports from serializers.py and worker.py

### Why are the changes needed?

These three places duplicated the same coerce-and-reassemble logic that `enforce_schema` already provides. Consolidating reduces code duplication and ensures consistent error handling.

### Does this PR introduce _any_ user-facing change?

Error messages for type/schema mismatches in Arrow UDTFs are slightly changed to be consistent with other Arrow UDF error messages.

### How was this patch tested?

Existing tests in `test_arrow_udtf.py` and `test_arrow_udf_scalar.py`.

### Was this patch authored or co-authored using generative AI tooling?

Yes.
@Yicong-Huang Yicong-Huang force-pushed the SPARK-56166/enforce-schema branch from c537147 to e6a55c9 Compare March 23, 2026 23:43
@Yicong-Huang Yicong-Huang changed the title [SPARK-56166][PYTHON] Use ArrowBatchTransformer.enforce_schema to replace manual type coercion logic [SPARK-56166][PYTHON] Use ArrowBatchTransformer.enforce_schema to replace column-wise type coercion logic Mar 24, 2026
# If so, use index-based access (faster than name lookup).
batch_names = [batch.schema.field(i).name for i in range(batch.num_columns)]
target_names = [field.name for field in arrow_schema]
use_index = batch_names == target_names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on use_index, do we need to take care of nested cases?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants