-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Core, Data: File Format API interfaces #12774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 4 commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
b33aec4
Core, Data: File Format API interfaces
pvary 5b79dbd
Renjie's comments
pvary a1daced
registerObjectModel exception handling fix
pvary 79d7703
Removing the need for data.AppenderBuilder
pvary 8404aa7
Fixed Renjie findings
pvary a00540d
Cosmentic changes
pvary 2a4816a
Steven's comments
pvary ba16b59
ObjectModelRegistry->FileAccessFactory, and AppenederBuilder->WriteBu…
pvary 9976bfb
Review comments by Steven and Russell (some javadoc, and a few method…
pvary 62ea041
Added generic for the input/output of the reader/writer - like 'FileA…
pvary 22ca3d7
New classes for implementation allows for absolutely new interfaces
pvary 598f46d
Default methods for setting multiple config/meta values
pvary 5ce49dc
Return FileReader instead of CloseableIterable from the ReadBuilder
pvary 46a1742
Revert "Return FileReader instead of CloseableIterable from the ReadB…
pvary 026e5e9
Ryan's comments
pvary 3149874
Move interface classes to core
pvary 4659adf
Rename FileAccessFactory to ObjectModelFactory
pvary 40ec8bb
Ryan's next round of comments
pvary f20bb4e
Separate input conversion from witers
pvary 7e91a40
Eduard's comments
pvary a957c47
Fixing parameter names
pvary 8ce2f2e
Ryan's comments
pvary 2b1b10b
Remove builder parameter from data file writers
pvary 26e03b7
Remove parametrization as much as possible
pvary ef41daa
ContentFileWriteBuilder needs a generic parameter
pvary acb2254
Revert transformers, and used engine specific type setting for writer…
pvary 3efd188
Steven's and Russel's comments
pvary e5611f4
Move to writeBuilder/positionDeleteWriteBuilder solution, and depreca…
pvary 3a6a5ed
Use a specific FormatModel to write PositionDeletes
pvary fad5e07
Changes discussed on the sync
pvary 790e282
Steven's comments
pvary 224070e
Remove the FormatModelRegistry.writeBuilder method
pvary 0c439c1
Eduard's comment
pvary ddcf866
Addressing Amogh's, Steven's and Gabor's comments
pvary 2fa0c5f
Removing ReadBuilder.outputSchema per Steven's and Renjie's comments
pvary 8f58a9e
Aihua's comments
pvary b22a91a
Move back to parametrized writers
pvary 8085a4b
Encryption handling
pvary af4f8a3
Revert back to OutputFile in the FormatModel
pvary c17c9ca
Synchronized register method, and only provide writeBuilder, location…
pvary 7c175ea
constantValues -> idToConstant
pvary 7ed56a5
Remove new from the API parameter names
pvary 69461a2
Move to EncryptedOutputFile from OutputFile FormatModel.writeBuilder
pvary fdbb6c0
Ryan's comments
pvary 4ae6ba0
Fix typo in comment
pvary fe743e1
Revert back to prevent re-registering models
pvary b66e891
Ryan's comments
pvary d450ec6
Steven's comment to change the log message for failed registration
pvary 564ba8c
Create a marker class for the Comet reader and a few extra nits
pvary 17f030f
Added back the ReadBuilder.outputSchema method as the attirbute is us…
pvary 74ca9f7
Changes from #12298 based on Ryan's comments
pvary 6cf720a
Last fixes
pvary 7f6cb33
Javadoc comment fixes
pvary File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
109 changes: 109 additions & 0 deletions
109
core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.io; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.Schema; | ||
|
|
||
| /** | ||
| * Interface which is implemented by the data file format implementations. The {@link ObjectModel} | ||
| * provides the {@link AppenderBuilder} for the given parameters: | ||
| * | ||
| * <ul> | ||
| * <li>file format | ||
| * <li>engine specific object model | ||
| * <li>{@link ObjectModel.WriteMode} | ||
| * </ul> | ||
| * | ||
| * The {@link AppenderBuilder} is used to write data to the target files. | ||
| * | ||
| * @param <B> type returned by builder API to allow chained calls | ||
| * @param <E> the engine specific schema of the input data | ||
| */ | ||
| public interface AppenderBuilder<B extends AppenderBuilder<B, E>, E> { | ||
|
stevenzwu marked this conversation as resolved.
Outdated
|
||
| /** Set the file schema. */ | ||
| B schema(Schema newSchema); | ||
|
|
||
| /** | ||
| * Set a writer configuration property which affects the writer behavior. | ||
| * | ||
| * @param property a writer config property name | ||
| * @param value config value | ||
| * @return this for method chaining | ||
| */ | ||
| B set(String property, String value); | ||
|
|
||
| default B set(Map<String, String> properties) { | ||
| properties.forEach(this::set); | ||
| return (B) this; | ||
| } | ||
|
|
||
| /** | ||
| * Set a file metadata property in the created file. | ||
| * | ||
| * @param property a file metadata property name | ||
| * @param value config value | ||
| * @return this for method chaining | ||
| */ | ||
| B meta(String property, String value); | ||
|
|
||
| /** Sets the metrics configuration used for collecting column metrics for the created file. */ | ||
| B metricsConfig(MetricsConfig newMetricsConfig); | ||
|
|
||
| /** Overwrite the file if it already exists. By default, overwrite is disabled. */ | ||
| B overwrite(); | ||
|
|
||
| /** | ||
| * Overwrite the file if it already exists. The default value is <code>false</code>. | ||
| * | ||
| * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility. | ||
|
stevenzwu marked this conversation as resolved.
Outdated
|
||
| * Use {@link #overwrite()} instead. | ||
| */ | ||
| @Deprecated | ||
| B overwrite(boolean enabled); | ||
|
|
||
| /** | ||
| * Sets the encryption key used for writing the file. If encryption is not supported by the reader | ||
| * then an exception should be thrown. | ||
| */ | ||
| default B fileEncryptionKey(ByteBuffer encryptionKey) { | ||
| throw new UnsupportedOperationException("Not supported"); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the additional authentication data prefix used for writing the file. If encryption is not | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| * supported by the reader then an exception should be thrown. | ||
| */ | ||
| default B aadPrefix(ByteBuffer aadPrefix) { | ||
| throw new UnsupportedOperationException("Not supported"); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the engine native schema for the input. Defines the input type when there is N to 1 | ||
| * mapping between the engine type and the Iceberg type, and providing the Iceberg schema is not | ||
| * enough for the conversion. | ||
| */ | ||
| B engineSchema(E newEngineSchema); | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
|
|
||
| /** Finalizes the configuration and builds the {@link FileAppender}. */ | ||
| <D> FileAppender<D> build() throws IOException; | ||
| } | ||
106 changes: 106 additions & 0 deletions
106
core/src/main/java/org/apache/iceberg/io/ObjectModel.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.io; | ||
|
|
||
| import org.apache.iceberg.FileFormat; | ||
|
|
||
| /** | ||
| * Direct conversion is used between file formats and engine internal formats for performance | ||
| * reasons. Object models encapsulate these conversions. | ||
| * | ||
| * <p>{@link ReadBuilder} is provided for reading data files stored in a given {@link FileFormat} | ||
| * into the engine specific object model. | ||
| * | ||
| * <p>{@link AppenderBuilder} is provided for writing engine specific object model to data/delete | ||
| * files stored in a given {@link FileFormat}. | ||
| * | ||
| * <p>Iceberg supports the following object models natively: | ||
| * | ||
| * <ul> | ||
| * <li>generic - reads and writes Iceberg {@link org.apache.iceberg.data.Record}s | ||
| * <li>spark - reads and writes Spark InternalRow records | ||
| * <li>spark-vectorized - vectorized reads for Spark columnar batches. Not supported for {@link | ||
| * FileFormat#AVRO} | ||
| * <li>flink - reads and writes Flink RowData records | ||
| * <li>arrow - vectorized reads for into Arrow columnar format. Only supported for {@link | ||
| * FileFormat#PARQUET} | ||
| * </ul> | ||
| * | ||
| * <p>Engines could implement their own object models to leverage Iceberg data file reading and | ||
| * writing capabilities. | ||
| * | ||
| * @param <E> the engine specific schema of the input data for the appender | ||
| */ | ||
| public interface ObjectModel<E> { | ||
|
stevenzwu marked this conversation as resolved.
Outdated
|
||
| /** The file format which is read/written by the object model. */ | ||
| FileFormat format(); | ||
|
|
||
| /** | ||
| * The name of the object model. Allows users to specify the object model to map the data file for | ||
| * reading and writing. | ||
| */ | ||
| String name(); | ||
|
|
||
| /** | ||
| * The appender builder for the output file which writes the data in the specified file format and | ||
| * accepts the records defined by this object model. The 'mode' parameter defines the input type | ||
| * for the specific writer use-cases. The appender should handle the following input in the | ||
| * specific modes: | ||
| * | ||
| * <ul> | ||
| * <li>The appender's engine specific input type | ||
| * <ul> | ||
| * <li>{@link WriteMode#DATA_WRITER} | ||
| * <li>{@link WriteMode#EQUALITY_DELETE_WRITER} | ||
| * </ul> | ||
| * <li>{@link org.apache.iceberg.deletes.PositionDelete} where the type of the row is the | ||
| * appender's engine specific input type when the 'mode' is {@link | ||
| * WriteMode#POSITION_DELETE_WRITER} | ||
| * </ul> | ||
| * | ||
| * @param outputFile to write to | ||
| * @param mode for the appender | ||
| * @return the appender builder | ||
| * @param <B> The type of the appender builder | ||
| */ | ||
| <B extends AppenderBuilder<B, E>> B appenderBuilder(OutputFile outputFile, WriteMode mode); | ||
|
stevenzwu marked this conversation as resolved.
Outdated
|
||
|
|
||
| /** | ||
| * The reader builder for the input file which reads the data from the specified file format and | ||
| * returns the records in this object model. | ||
| * | ||
| * @param inputFile to read from | ||
| * @return the reader builder | ||
| * @param <B> The type of the reader builder | ||
| */ | ||
| <B extends ReadBuilder<B>> B readBuilder(InputFile inputFile); | ||
|
stevenzwu marked this conversation as resolved.
Outdated
|
||
|
|
||
| /** | ||
| * Writer modes. Based on the mode the object model could alter the appender configuration when | ||
| * creating the {@link FileAppender}. | ||
| */ | ||
| enum WriteMode { | ||
| /** Mode for writing data files. */ | ||
| DATA_WRITER, | ||
| /** Mode for writing equality delete files. */ | ||
| EQUALITY_DELETE_WRITER, | ||
| /** Mode for writing position delete files. */ | ||
| POSITION_DELETE_WRITER, | ||
| } | ||
| } | ||
123 changes: 123 additions & 0 deletions
123
core/src/main/java/org/apache/iceberg/io/ReadBuilder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.io; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.mapping.NameMapping; | ||
|
|
||
| /** | ||
| * File formats should implement this interface to provide a builder for reading data files. {@link | ||
| * ReadBuilder} reads the data files with the specified parameters. The returned objects are defined | ||
| * by the {@link ObjectModel} which is used to read the data. | ||
| * | ||
| * <p>This interface is directly exposed for the users to parameterize readers. | ||
| * | ||
| * @param <B> type returned by builder API to allow chained calls | ||
| */ | ||
| public interface ReadBuilder<B extends ReadBuilder<B>> { | ||
| /** The configuration key for the batch size in case of vectorized reads. */ | ||
| String RECORDS_PER_BATCH_KEY = "iceberg.records-per-batch"; | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
|
|
||
| /** | ||
| * Restricts the read to the given range: [start, start + length). | ||
| * | ||
| * @param newStart the start position for this read | ||
| * @param newLength the length of the range this read should scan | ||
| */ | ||
| B split(long newStart, long newLength); | ||
|
|
||
| /** Read only the given columns. */ | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| B project(Schema newSchema); | ||
|
|
||
| /** | ||
| * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary | ||
| * records. Some readers might not be able to filter some part of the expression. In this case the | ||
| * reader might return unfiltered or partially filtered rows. It is the caller's responsibility to | ||
| * apply the filter again. | ||
| * | ||
| * @param newFilter the filter to set | ||
| * @param filterCaseSensitive whether the filtering is case-sensitive or not | ||
| */ | ||
| default B filter(Expression newFilter, boolean filterCaseSensitive) { | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| // Skip filtering if not available | ||
| return (B) this; | ||
| } | ||
|
|
||
| /** | ||
| * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary | ||
| * records. Some readers might not be able to filter some part of the exception. In this case the | ||
| * reader might return unfiltered or partially filtered rows. It is the caller's responsibility to | ||
| * apply the filter again. The default implementation sets the filter to be case-sensitive. | ||
| * | ||
| * @param newFilter the filter to set | ||
| */ | ||
| default B filter(Expression newFilter) { | ||
| return filter(newFilter, true); | ||
| } | ||
|
|
||
| /** | ||
| * Sets configuration key/value pairs for the reader. Reader builders should ignore configuration | ||
| * keys not known for them. | ||
| */ | ||
| default B set(String key, String value) { | ||
| // Skip configuration if not applicable | ||
| return (B) this; | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| /** | ||
| * Enables reusing the containers returned by the reader. Decreases pressure on GC. Readers could | ||
| * decide to ignore the user provided setting if is not supported by them. | ||
| */ | ||
| default B reuseContainers() { | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| // Skip container reuse configuration if not applicable | ||
| return (B) this; | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| /** | ||
| * Accessors for constant field values. Used for calculating values in the result which are coming | ||
| * from metadata, and not coming from the data files themselves. The keys of the map are the | ||
| * column ids, the values are the accessors generating the values. | ||
| */ | ||
| B constantFieldAccessors(Map<Integer, ?> constantFieldAccessors); | ||
|
|
||
| /** Sets a mapping from external schema names to Iceberg type IDs. */ | ||
| B withNameMapping(NameMapping newNameMapping); | ||
|
|
||
| /** | ||
| * Sets the file encryption key used for reading the file. If encryption is not supported by the | ||
| * reader then an exception should be thrown. | ||
| */ | ||
| default B withFileEncryptionKey(ByteBuffer encryptionKey) { | ||
| throw new UnsupportedOperationException("Not supported"); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the additional authentication data prefix for encryption. If encryption is not supported | ||
| * by the reader then an exception should be thrown. | ||
| */ | ||
| default B withAADPrefix(ByteBuffer aadPrefix) { | ||
|
pvary marked this conversation as resolved.
Outdated
|
||
| throw new UnsupportedOperationException("Not supported"); | ||
| } | ||
|
|
||
| /** Builds the reader. */ | ||
| <D> CloseableIterable<D> build(); | ||
| } | ||
44 changes: 44 additions & 0 deletions
44
data/src/main/java/org/apache/iceberg/data/DataWriterBuilder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.data; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.io.DataWriter; | ||
|
|
||
| /** | ||
| * Builder for generating a {@link DataWriter}. | ||
| * | ||
| * @param <B> type of the builder | ||
| * @param <E> engine specific schema of the input records used for appender initialization | ||
| */ | ||
| public interface DataWriterBuilder<B extends DataWriterBuilder<B, E>, E> | ||
| extends FileWriterBuilderBase<B, E> { | ||
| /** | ||
| * Creates a writer which generates a {@link org.apache.iceberg.DataFile} based on the | ||
| * configurations set. The data writer will expect inputs defined by the {@link | ||
| * #engineSchema(Object)} which should be convertible to the Iceberg schema defined by {@link | ||
| * #schema(Schema)}. | ||
| * | ||
| * @param <D> the type of data that the writer will handle | ||
| * @return a {@link DataWriter} instance configured with the specified settings | ||
| * @throws IOException if an I/O error occurs during the creation of the writer | ||
| */ | ||
| <D> DataWriter<D> dataWriter() throws IOException; | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.