-
Notifications
You must be signed in to change notification settings - Fork 3k
[WIP] V4 Manifest Read Support #14533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| * Manifest deletion vector entry (V4+ only) - marks entries in a manifest as deleted without | ||
| * rewriting the manifest. | ||
| */ | ||
| MANIFEST_DV(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the option of having the DV located in a field of the data or delete manifest record. That way we don't have to wait to find the DV before processing a manifest file. Not sure what others think here, but since the DV metadata/content is likely going to be different between the Metadata DV (inline) and Data DV (stored in Puffin), I don't see much value in trying to reuse metadata fields for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my preference too, and I advocated for it in our community discussion but this is what we settled on. Our current v4 proposal specifically uses MANIFEST_DV as a separate content type that references manifests via the referenced_file field. We can certainly change it, but want to hear from others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just check with Amogh and I don't think that this has been firmly decided yet. From the implementation here, I think we should embed the DV in manifest-specific metadata.
| * <p>When present, the deletion vector is stored inline in the manifest rather than in a separate | ||
| * Puffin file. | ||
| */ | ||
| ByteBuffer inlineContent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this in my comment below, but I don't think there's much value in combining the inline MDV metadata and fields to track data DVs stored in Puffin. These aren't overlapping, so I'd keep them separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data and metadata DVs seemed like similar concepts to me. But I don't mind changing it.
| * <p>Contains status, snapshot ID, sequence numbers, and first-row-id. Optional - may be null if | ||
| * tracking info is inherited. | ||
| */ | ||
| TrackingInfo trackingInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this also requires a field to be defined, so we have a record of the ID used for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Will add it as soon as we define it. Right now it's TBD in the proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just assign it now and update the proposal. I also have example assignments in my exploration if you want to be consistent:
fn tracked_file_write_schema(
table_schema: &Schema,
stats_modes: Option<&HashMap<FieldId, StatsMode>>,
) -> Schema {
let default = HashMap::from_iter((0..=32).into_iter().map(|id| (id, DEFAULT_STATS_MODE)));
let modes: &HashMap<FieldId, StatsMode> = stats_modes.unwrap_or_else(|| &default);
StructType::new([
required("tracking_info", tracking_info_schema(), 147), // TODO: assigned ID
required("content", DataType::INTEGER, 134), // now required!
required("location", DataType::STRING, 100),
required("file_format", DataType::STRING, 101),
required("record_count", DataType::LONG, 103),
required("file_size_in_bytes", DataType::LONG, 104),
required(
"content_stats",
content_stats_schema(table_schema, modes),
146,
), // TODO: ID is from stats proposal
optional("key_metadata", DataType::BINARY, 131),
optional("split_offsets", ArrayType::new(DataType::LONG, false), 132), // TODO: missing element ID (133)
optional("content_slice", slice_schema(), 148), // TODO: assigned ID
optional("referenced_file", DataType::STRING, 143),
optional("manifest_stats", manifest_stats_schema(), 149), // TODO: assigned ID
optional("min_sequence_number", DataType::LONG, 516),
])
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is content_slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is content_info struct as in the proposal?
| * @throws IllegalStateException if content_type is not DATA | ||
| * @throws UnsupportedOperationException if ContentStats not yet implemented | ||
| */ | ||
| DataFile asDataFile(PartitionSpec spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we want to pass in the spec, since the record contains an ID. Wouldn't it be better to pass in a map of specs by ID when reading manifests so that this is already known when adapting to DataFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I thought about it, but was not sure which would be cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this API, I think it is cleaner to call asDataFile() without any arguments.
| DeleteFile asDeleteFile(PartitionSpec spec); | ||
|
|
||
| /** Set the status for this tracked file entry. */ | ||
| void setStatus(TrackingInfo.Status status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API should not expose any setter methods. The implementation used can, if needed, for things like inherited metadata. But the API interface itself should not force implementations to be mutable. In general, we want to think of the API interfaces as immutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it as immutable initially, and added mutability while implementing inheritable metadata. That means that in places which need mutability, we will need to downcast it, which is fine I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I didn't realize that we had added the setter methods to the ManifestEntry interface. Looks like that was probably allowed because ManifestEntry is in core and not exposed in the public API. Instead, DataFile and DeleteFile are in the API.
Here, I think we need to be more careful. This is very likely going to be in API along side DataFile, so it should be an immutable interface. To avoid downcasting, the manifest reader should configure its Parquet reader to produce the concrete class, GenericTrackedFile. That class can be mutable so the reader can pass instances to InheritableTrackedMetadata and then the reader should return those instances as TrackedFile (because it is CloseableIterable<TrackedFile>). That shouldn't require casting.
| */ | ||
| public interface TrackingInfo { | ||
| /** Status of an entry in a tracked file */ | ||
| enum Status { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this enum already defined somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in ManifestEntry.Status, but it is in core. For v4, we need it in the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this is fine for now. We may want to move it later, but we'll see what makes sense.
| @Override | ||
| public CloseableIterable<FileScanTask> doPlanFiles() { | ||
| Snapshot snapshot = snapshot(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Avoid unnecessary whitespace changes. They cause conflicts.
| : 2; | ||
|
|
||
| if (formatVersion >= 4) { | ||
| return planV4Files(snapshot, io); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than modifying data table scan right now, let's leave this out. We don't need to plug anything into table scans at this point, since that is just a configuration API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I have reverted this file. Will send a followup PR to wire this up.
| * | ||
| * <p>Use this method to copy data without stats when collecting files. | ||
| */ | ||
| F copyWithoutStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional that we removed copyWithStats(Set<Integer> requestedColumnIds) from ContentFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be supported. I left it out for now because the stats was still being defined by @nastra.
| } | ||
| } | ||
|
|
||
| Types.NestedField STATUS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to have a static schema() method on these classes that defines the current schema, not just fields. That way it's easy to see how the schema is constructed and easy to reference for the read path.
| } | ||
|
|
||
| Types.NestedField STATUS = | ||
| Types.NestedField.required(0, "status", Types.IntegerType.get(), "Entry status"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other field docs expand the enum symbols: "Entry status: 0=existing, 1=added, 2=deleted/removed"
| 1, | ||
| "snapshot_id", | ||
| Types.LongType.get(), | ||
| "Snapshot ID where the file was added, or deleted if status is 2. Inherited when null."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be "Snapshot ID where the file was added or deleted/removed". No need to mention specific status codes or inheritance. People reading these docs should never see values without a snapshot ID because of inheritance so I don't think there is value in adding it here because this is primarily for metadata tables.
| 3, | ||
| "sequence_number", | ||
| Types.LongType.get(), | ||
| "Data sequence number of the file. Inherited when null and status is 1 (added). Must be equal to file_sequence_number if content_type is 3 or 4."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my comment above, this should not describe behavior. Instead, describe what the data sequence number means.
| * <p>Inherited when null and status is 1 (added). Must be equal to file_sequence_number if | ||
| * content_type is 3 or 4. | ||
| */ | ||
| Long sequenceNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a new interface, should we use dataSequenceNumber instead?
| * | ||
| * <p>If content_type is 0 (DATA), this is the _row_id for the first row in the data file. If | ||
| * content_type is 3 (DATA_MANIFEST), this is the starting _row_id to assign to rows added by | ||
| * ADDED data files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with the schema fields, I don't think that it is valuable to describe the format behavior here, either.
| * @param entry the tracked file entry | ||
| * @return the entry with metadata applied | ||
| */ | ||
| TrackedFile<?> apply(TrackedFile<?> entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should either be GenericTrackedFile (the internal representation) or an interface in core, like TrackedFileSetters. I would probably use GenericTrackedFile.
I think we also need to consider what setter methods to add here. We have inverted the relationship between TrackedFile / DataFile and TrackingInfo / ManifestEntry so I don't think it makes sense to call setSnapshotId on TrackedFile itself just because that is the top-level record in manifests. I'd rather get the TrackingInfo and modify that directly:
TrackedFile apply(GenericTrackedFile file) {
TrackingInfoStruct info;
if (file.trackingInfo() instanceof TrackingInfoStruct) {
info = file.trackingInfo();
} else {
info = new TrackingInfoStruct(file.trackingInfo()); // copy to mutable
}
if (info.snapshotId() == null) {
info.setSnapshotId(snapshotId);
}
...
file.setTrackingInfo(info);
return file;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've implemented something along the lines of your suggestion. PTAL.
| * | ||
| * @param <F> the concrete class of a TrackedFile instance | ||
| */ | ||
| public interface TrackedFile<F> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's much value in parameterizing TrackedFile by F. The reason why ContentFile is parameterized is that we want to share classes between DataFile and DeleteFile, like ManifestReader<F extends ContentFile<F>>. Then we can return new ManifestReader<>(...) to produce both ManifestReader<DataFile> and ManifestReader<DeleteFile>.
I think that v4 is going to be different because TrackedFile is not a parent of DeleteFile and DataFile. Instead, TrackedFile is used as a generic container for DataFile, DeleteFile, and ManifestFile and the v4 root manifest reader can produce all 3 of those types mixed together. The reader will be a CloseableIterable<TrackedFile> rather than CloseableIterable<F extends TrackedFile<F>> because TrackedFile can produce a DataFile via asDataFile(). It is not always itself a DataFile.
To simplify, I think we can drop the type param here and use wrapper classes that call asDataFile() or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. That simplifies things a fair bit.
| void setFirstRowId(Long firstRowId); | ||
|
|
||
| /** Set the ordinal position in the manifest. */ | ||
| void setPos(Long position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this is needed. In the v3 implementation, we use a column that projects _pos without calling a setter.
| * @param manifestEntry the DATA_MANIFEST or DELETE_MANIFEST tracked file from root | ||
| * @return inheritable metadata instance | ||
| */ | ||
| static InheritableTrackedMetadata fromTrackedFile(TrackedFile<?> manifestEntry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Methods like this should accept a typed file, like ManifestFile, instead of converting. That way we can keep the conversion logic in a single place and the caller can either pass an existing ManifestFile or calls trackedFile.asManifestFile().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it for now.
| manifestEntry.contentType()); | ||
|
|
||
| TrackingInfo tracking = manifestEntry.trackingInfo(); | ||
| Long snapshotId = tracking != null ? tracking.snapshotId() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracking info should never be null for a manifest passed into fromManifest. If it is, then this should complain that the manifest is missing its tracking info and appears to be uncommitted. (For context, to read an uncommitted manifest, the caller must use either InheritableTrackingMetadata.forCopy or InheritableTrackingMetadata.empty() but we don't want to include this hint in the error message.)
| Long sequenceNumber = tracking != null ? tracking.sequenceNumber() : null; | ||
|
|
||
| Preconditions.checkArgument( | ||
| snapshotId != null, "Manifest entry must have snapshot ID: %s", manifestEntry.location()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The term "manifest entry" is confusing in v4 because it could be referring to an entry in a manifest (the original definition) or a tracked file for a manifest. This is why we are using the term "tracked file" now. Method names, variable names, and error messages for v4 metadata should always replace "manifest entry" with a more specific and less ambiguous term.
|
|
||
| private BaseInheritableTrackedMetadata(long snapshotId, long sequenceNumber) { | ||
| this.snapshotId = snapshotId; | ||
| this.sequenceNumber = sequenceNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should also track the location of the manifest and update TrackingInfo with it. This is used at commit time to know where a given data file came from so that we can rewrite the containing manifest (v3) or update a MDV (v4) without searching for the containing manifest (unless it has been rewritten or removed).
| * content_type is 3 (DATA_MANIFEST), this is the starting _row_id to assign to rows added by | ||
| * ADDED data files. | ||
| */ | ||
| Long firstRowId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also include manifestLocation() and manifestPos() for tracking information. See https://github.com/apache/iceberg/pull/14533/files#r2539910058
| entry -> { | ||
| Long pos = entry.pos(); | ||
| // positions are 0-based and should not exceed Integer.MAX_VALUE | ||
| return pos == null || !deletedPositions.contains(pos.intValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is a bad practice to assume a row should be filtered (or some other action) based on a missing value like this, similar to the missing tracking_info cases below.
The problem is that this will cause silent data loss if pos is incorrectly projected as null because of a bug in another part of the codebase. This filter depends on having a valid pos so it should fail if pos is null rather than choosing to skip data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| } catch (IOException e) { | ||
| throw new RuntimeIOException(e, "Failed to deserialize Roaring bitmap from manifest DV"); | ||
| } | ||
| return bitmap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: In Iceberg, control flow blocks should be separated from the statements following them by an empty newline.
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
Thanks for the feedback, Ryan. I will send a new revision. |
|
@rdblue Thanks for the thorough feedback! I've updated the PR incorporating pretty much all of it. PTAL. |
Introduces the foundational types for V4 manifest format support: - TrackedFile interface as unified representation for all V4 entry types - DeletionVector and ManifestStats interfaces - GenericTrackedFile implementation and test
- Replace select() with filterData(), projectStats(), project() methods - Remove firstRowId from readRoot(); only leaf manifests need assignment - Use MetadataColumns.ROW_POSITION for safe position tracking - Always project tracking_info for correctness - Add WRITE_TYPE schema excluding read-only metadata columns
- Change TrackedFile.contentStats() return type from Object to ContentStats - Add helper methods to convert ContentStats to Map-based stats for DataFile/DeleteFile compatibility - TrackedDataFile and TrackedDeleteFile now properly translate: - valueCounts - nullValueCounts - nanValueCounts - lowerBounds - upperBounds - Add filterContentStats for copyWithStats support
WIP PR for s.apache.org/iceberg-single-file-commit
Implemented so far: