Core, Spark 4.1: Fix querying equality deletes with schema evolution#15268
Conversation
5d78e6b to
8528e5f
Compare
|
do we want this is in 1.11 ? |
|
@singhpk234, possibly nice to include as the use case is pretty basic? Can you help review? |
| private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class); | ||
|
|
||
| private final Table table; | ||
| private final Schema tableSchema; |
There was a problem hiding this comment.
We only used tableSchema to resolve equality delete fields. We have to switch to all schemas in table.
There was a problem hiding this comment.
Hm, have we considered just building a union schema?
There was a problem hiding this comment.
Ah nvm, we probably want to lazily do that, since on average we wouldn't expect to have to reference the historical fields.
There was a problem hiding this comment.
It is tricky as we don't know which IDs are used in equality delete files.
| // field lookup for serializable tables that assumes fetching historic schemas is expensive | ||
| private static class FieldLookup implements Function<Integer, Types.NestedField> { | ||
| private final Table table; | ||
|
|
There was a problem hiding this comment.
Maybe drop empty line.
| // this class is not meant to be exposed beyond the delete file index | ||
| private static class EqualityDeleteFile { | ||
| private final PartitionSpec spec; | ||
| private final Function<Integer, Types.NestedField> fieldLookup; |
There was a problem hiding this comment.
We previously used spec.schema() to resolve equality fields. That's not OK.
| return field != null ? field : historicSchemaFields().get(id); | ||
| } | ||
|
|
||
| private Map<Integer, Types.NestedField> historicSchemaFields() { |
There was a problem hiding this comment.
Not sure about historic. Maybe previousSchemaFields() or oldSchemaFields?
There was a problem hiding this comment.
I actually like historic but I'm also good with previous or prior, not very opinionated here
There was a problem hiding this comment.
Then historic it is.
singhpk234
left a comment
There was a problem hiding this comment.
Thanks @aokolnychyi this LGTM overall suggested some minor suggestions and a linked a discussion you might be interested
| private static Collection<Schema> historicSchemas(Table table) { | ||
| return table.schemas().values().stream() | ||
| .filter(schema -> schema.schemaId() != table.schema().schemaId()) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| } |
There was a problem hiding this comment.
This would now require loading the TableMetadata on executor, which is absolutely fine.
please check, why this could be a potential issue: #14944
There was a problem hiding this comment.
We've been loading the table metadata on the executor for a while at least for other use cases like metadata tables so we're not quite just doing it "now". I believe the specific concern is that currently, when loading table metadata from executors, executors currently read the table metadata JSON lazily when they need certain parts of table metadata that are not stored in SerializableTable. For use cases like server-side planning, there may not be any credentials to actually read the metadata json directly.
I think that's still ultimately a separate issue and given this change, I believe we'd see this issue specifically when there are equality deletes returned as part of scan planning.
I think we can look at that as needed so that executors can resolve table metadata from different approaches (naive approach, just broadcast the whole serializable table for those cases or more lazily resolve the metadata by issuing load table requests), not just having to read the metadata file.
Since the immediate use cases for scan planning tend to not return equality deletes and also the worst case is failure anyways, I don't think this is something to concern ourselves with for the 1.11 release but it's something we should fix at some point.
There was a problem hiding this comment.
To add, I would just test our remote planning with this change, make sure things behave as expected. I was planning on doing that for 1.11 release voting anyways.
There was a problem hiding this comment.
I don't think this is something to concern ourselves with for the 1.11 release
I am on the same page :), just posted here as an FYI, since there has been some discussions about it !
There was a problem hiding this comment.
Correct, that's why I implemented the happy case when the field is in the current schema. So the only case when we will need to load the extra schemas and read the json file on executors is when we can't find the equality delete fields in the current schema.
What do you think about this as is, @amogh-jahagirdar @singhpk234?
There was a problem hiding this comment.
Sounds Great to me ! I think this totally fine since the case we have in mind will not return eq delete (untrusted clients) from server
| } | ||
| } | ||
|
|
||
| // indexes all fields from schemas, preferring field definitions from higher schema IDs |
There was a problem hiding this comment.
minor : may be ok to mention this is for cases like type promotion ?
| Types.NestedField field = spec.schema().findField(id); | ||
| Types.NestedField field = fieldLookup.apply(id); | ||
| Preconditions.checkArgument(field != null, "Cannot find field for ID %s", id); |
There was a problem hiding this comment.
[doubt] what would have been prev behaviour when the field was null in spec.schema() was null ? did we failed later ?
There was a problem hiding this comment.
I just copied the new test into a branch without the fix to check this, yes it fails when we try and prune out which equality deletes to apply based on stats because we won't be able to resolve the field in the first place.
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L233 fails with a NPE
There was a problem hiding this comment.
Correct, the test I added previously failed.
| Set<Integer> seenSchemaIds = Sets.newHashSet(); | ||
|
|
||
| for (Schema schema : sortByIdAsc(schemas)) { | ||
| if (!seenSchemaIds.contains(schema.schemaId())) { |
There was a problem hiding this comment.
can we make it sorted set instead ?
There was a problem hiding this comment.
That's a good idea.
| private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class); | ||
|
|
||
| private final Table table; | ||
| private final Schema tableSchema; |
There was a problem hiding this comment.
Hm, have we considered just building a union schema?
| private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class); | ||
|
|
||
| private final Table table; | ||
| private final Schema tableSchema; |
There was a problem hiding this comment.
Ah nvm, we probably want to lazily do that, since on average we wouldn't expect to have to reference the historical fields.
| return field != null ? field : historicSchemaFields().get(id); | ||
| } | ||
|
|
||
| private Map<Integer, Types.NestedField> historicSchemaFields() { |
There was a problem hiding this comment.
I actually like historic but I'm also good with previous or prior, not very opinionated here
singhpk234
left a comment
There was a problem hiding this comment.
LGTM, thanks @aokolnychyi !
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Thanks @aokolnychyi this looks good to me!
|
Thank you, @singhpk234 @amogh-jahagirdar! |
This PR fixes querying tables with equality deletes if identity columns used to create the equality delete file are no longer part of the current or time travel schema.
The following scenario fails without the changes in this PR:
statusstatusfrom the table