-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-55121][PYTHON][SS] Add DataStreamReader.name() to Classic PySpark #53898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
JIRA Issue Information=== Task SPARK-55121 === This comment was automatically generated by GitHub Actions |
02fa47f to
df74e5f
Compare
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with this but I think it would be better if @HeartSaVioR has some time to take a look.
anishshri-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending green CI
python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
Outdated
Show resolved
Hide resolved
python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
Outdated
Show resolved
Hide resolved
gaogaotiantian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some final minor suggestions :)
python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
Outdated
Show resolved
Hide resolved
python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
Outdated
Show resolved
Hide resolved
### What changes were proposed in this pull request?
This PR adds the `name()` method to Classic PySpark's `DataStreamReader` class. This method allows users to specify a name for streaming sources, which is used in checkpoint metadata and enables stable checkpoint locations for source evolution.
Changes include:
- Add `name()` method to `DataStreamReader` in `python/pyspark/sql/streaming/readwriter.py`
- Add comprehensive test suite in `python/pyspark/sql/tests/streaming/test_streaming_reader_name.py`
- Update compatibility test to mark `name` as currently missing from Connect (until the Connect PR merges)
The method validates that the source_name contains only ASCII letters, digits, and underscores, raising `PySparkTypeError` or `PySparkValueError` for invalid inputs.
### Why are the changes needed?
This brings Classic PySpark to feature parity with the Scala/Java API for streaming source naming. The `name()` method is essential for:
1. Identifying sources in checkpoint metadata
2. Enabling stable checkpoint locations during source evolution
3. Providing consistency across Classic and Connect implementations
### Does this PR introduce _any_ user-facing change?
Yes. Users can now call `.name()` on DataStreamReader in Classic PySpark:
```python
spark.readStream.format("parquet").name("my_source").load("/path")
```
### How was this patch tested?
- Added comprehensive unit tests in `test_streaming_reader_name.py` covering:
- Valid name patterns (letters, digits, underscores)
- Invalid names (hyphens, spaces, dots, special characters, empty strings, None, wrong types)
- Method chaining
- Different data formats (parquet, json)
- Integration with streaming queries
- Updated compatibility tests to account for the current state where Classic has `name` but Connect doesn't yet
### Was this patch authored or co-authored using generative AI tooling?
Yes.
- Fix empty string validation to use VALUE_NOT_NON_EMPTY_STR error - Check type before checking emptiness to avoid confusing error messages - Combine all invalid name tests into single test with subTests - Use PySparkValueError instead of generic Exception for invalid names - Add more invalid name test cases (dollar, hash, exclamation)
- Remove redundant empty string validation check - empty and whitespace-only strings are now caught by the regex pattern validation - Remove unnecessary str() wrapper since type is already validated - Consolidate empty string test into invalid names test - Merge None and wrong type tests into single test method with multiple cases
787bc26 to
f9df920
Compare
What changes were proposed in this pull request?
This PR adds the
name()method to Classic PySpark'sDataStreamReaderclass. This method allows users to specify a name for streaming sources, which is used in checkpoint metadata and enables stable checkpoint locations for source evolution.Changes include:
name()method toDataStreamReaderinpython/pyspark/sql/streaming/readwriter.pypython/pyspark/sql/tests/streaming/test_streaming_reader_name.pynameas currently missing from Connect (until the Connect PR merges)The method validates that the source_name contains only ASCII letters, digits, and underscores, raising
PySparkTypeErrororPySparkValueErrorfor invalid inputs.Why are the changes needed?
This brings Classic PySpark to feature parity with the Scala/Java API for streaming source naming. The
name()method is essential for:Does this PR introduce any user-facing change?
Yes. Users can now call
.name()on DataStreamReader in Classic PySpark:How was this patch tested?
test_streaming_reader_name.pycovering:namebut Connect doesn't yetWas this patch authored or co-authored using generative AI tooling?
No.