-
Notifications
You must be signed in to change notification settings - Fork 3k
API, Spark: Handle binary, fixed types in expression in RemoteScanPlanning #14882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| Expression filter = null; | ||
| if (jsonNode.has(RESIDUAL_FILTER)) { | ||
| filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL_FILTER)); | ||
| filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL_FILTER), spec.schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was caught during the execution phase of spark, need to pass schema for residual
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Outdated
Show resolved
Hide resolved
7d7dcaa to
51d4aab
Compare
51d4aab to
f071179
Compare
| } | ||
| if (request.filter() != null) { | ||
| configuredScan = configuredScan.filter(request.filter()); | ||
| Expression filter = request.filter(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this change is probably not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i removed the .filter() check because we are marking that as deprecated and if we do this check this way i.e first try deserialize (if its null we bail out early) and based on the nullbability that we feed it to our scan down. Please let me know wdyt considering above.
| * @param schema the table schema to use for type-aware deserialization of filter values | ||
| * @return the filter expression, or null if no filter was specified | ||
| */ | ||
| public Expression filter(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me think about this a bit more. I also think we have a few more cases across the codebase where we also ser/de Expression without a Schema and theoretically we would have the same issue in those places as well.
Whatever approach we pick, we'd want to follow up in those other places too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the other thing we might need to consider is how we would be lazily binding this in other client implementations. @Fokko does pyiceberg have examples of how it does a late-binding similar to this one?
The issue that we have here is that we deserialize an Expression where we can only correctly do so when we bind it to a Schema
f071179 to
35ae73a
Compare
| */ | ||
| private static <T extends Scan<T, FileScanTask, ?>> T configureScan( | ||
| T scan, PlanTableScanRequest request) { | ||
| T scan, PlanTableScanRequest request, Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema to use would always be the configured schema for the scan no? In that case I think we can get rid of the 3rd argument and just do request.filter(configuredScan.schema()) below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean use scan.schema() ? because configured scan post select would cause issue if I have filter on column not being projected ? I think we can do that, let me refactor
| * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link #filter(Schema)} instead for | ||
| * proper type-aware deserialization | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about deprecating this. In the long run we do expect the expression filter to be self-describing with the grammar that's being proposed by @rdblue ; in particular we'd expect a data type at that point in the literal and for ExpressionParser.fromJson(json) to just work for that case.
So in the long run, I'd actually expect filter(Schema) to be deprecated, we just need it for now due to limitations in the protocol.
I think we should probably just keep both (with the limitation that filter() won't work for the 3 specific data types) and then drop the filter(Schema) if/when that proposal materializes.
Unless there are other cases we can envision where we'd want to pass a schema here?
| * @deprecated since 1.11.0, will be removed in 1.12.0; this method serializes the expression to | ||
| * JSON immediately, which may lose type information for BINARY, FIXED, and DECIMAL types | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same rationale as above, I'm not sure we should deprecate this because I think in the long run we'd expect such an API to exist. We can avoid API/deprecation churn by just keeping both until any grammar/protocol changes are made.
f6c0ac7 to
7b6465f
Compare
|
Update: I synced with Amogh offline on this, thanks a tons for brainstorming with me and suggesting this alternative in this first place, we bind any ways during the planning in the later all this lacked was a convertor case handling, i added this |
7b6465f to
f336aca
Compare
| BigDecimal decimal = new BigDecimal(value().toString()); | ||
| return (Literal<T>) new DecimalLiteral(decimal); | ||
|
|
||
| case FIXED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great. I think it would be good to have a unit test in the api module for this change
|
|
||
| @TestTemplate | ||
| @Disabled( | ||
| "binary filter that is used by Spark is not working because ExpressionParser.fromJSON doesn't have the Schema to properly parse the filter expression") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you might want to remove this for the other Spark versions as well in this PR, since the change is very small
nastra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM and thanks for fixing this @singhpk234. It would be good to have a unit test in the api module for the change in Literals, so that we don't rely on integration tests
ecfda34 to
1f1eb11
Compare
…rk versions - Add unit tests in api module for string-to-binary and string-to-fixed conversions - Test valid hex string conversions (uppercase and lowercase) - Test invalid hex strings return null - Test fixed type with wrong length returns null - Remove @disabled testBinaryInFilter from Spark v3.4, v3.5, and v4.0 - The fix in Literals.java now properly handles binary/fixed types - Tests should work across all Spark versions
1f1eb11 to
efd6006
Compare

About the change
Presently the expression when serialized doesn't capture the type so even binary when de-serialized its used as string which later fails. For the parsers its important to know the schema so that they could de-serialize stuff correctly, a part of it is handled in the SDK during response de-serialization via parser context but while the client can set this since its making the call the same can't be assumed by the server which would be doing the same deserialization of the request.
There is 3 ways to solve this problem :
filter(schema)and then wire the schema while deserialization in ExpressionParser.Considering where we are i implemented Approach#1
Testing
New test and existing tests