Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b33aec4
Core, Data: File Format API interfaces
pvary Apr 11, 2025
5b79dbd
Renjie's comments
pvary Apr 29, 2025
a1daced
registerObjectModel exception handling fix
pvary Apr 30, 2025
79d7703
Removing the need for data.AppenderBuilder
pvary May 6, 2025
8404aa7
Fixed Renjie findings
pvary May 7, 2025
a00540d
Cosmentic changes
pvary May 15, 2025
2a4816a
Steven's comments
pvary May 19, 2025
ba16b59
ObjectModelRegistry->FileAccessFactory, and AppenederBuilder->WriteBu…
pvary May 20, 2025
9976bfb
Review comments by Steven and Russell (some javadoc, and a few method…
pvary May 21, 2025
62ea041
Added generic for the input/output of the reader/writer - like 'FileA…
pvary May 22, 2025
22ca3d7
New classes for implementation allows for absolutely new interfaces
pvary May 26, 2025
598f46d
Default methods for setting multiple config/meta values
pvary May 28, 2025
5ce49dc
Return FileReader instead of CloseableIterable from the ReadBuilder
pvary Jun 16, 2025
46a1742
Revert "Return FileReader instead of CloseableIterable from the ReadB…
pvary Jun 24, 2025
026e5e9
Ryan's comments
pvary Jun 24, 2025
3149874
Move interface classes to core
pvary Jun 25, 2025
4659adf
Rename FileAccessFactory to ObjectModelFactory
pvary Jun 25, 2025
40ec8bb
Ryan's next round of comments
pvary Jun 27, 2025
f20bb4e
Separate input conversion from witers
pvary Jul 22, 2025
7e91a40
Eduard's comments
pvary Jul 24, 2025
a957c47
Fixing parameter names
pvary Jul 24, 2025
8ce2f2e
Ryan's comments
pvary Aug 6, 2025
2b1b10b
Remove builder parameter from data file writers
pvary Aug 6, 2025
26e03b7
Remove parametrization as much as possible
pvary Aug 13, 2025
ef41daa
ContentFileWriteBuilder needs a generic parameter
pvary Aug 14, 2025
acb2254
Revert transformers, and used engine specific type setting for writer…
pvary Aug 25, 2025
3efd188
Steven's and Russel's comments
pvary Sep 11, 2025
e5611f4
Move to writeBuilder/positionDeleteWriteBuilder solution, and depreca…
pvary Sep 15, 2025
3a6a5ed
Use a specific FormatModel to write PositionDeletes
pvary Sep 24, 2025
fad5e07
Changes discussed on the sync
pvary Oct 2, 2025
790e282
Steven's comments
pvary Oct 8, 2025
224070e
Remove the FormatModelRegistry.writeBuilder method
pvary Oct 13, 2025
0c439c1
Eduard's comment
pvary Oct 17, 2025
ddcf866
Addressing Amogh's, Steven's and Gabor's comments
pvary Oct 20, 2025
2fa0c5f
Removing ReadBuilder.outputSchema per Steven's and Renjie's comments
pvary Oct 21, 2025
8f58a9e
Aihua's comments
pvary Oct 26, 2025
b22a91a
Move back to parametrized writers
pvary Nov 5, 2025
8085a4b
Encryption handling
pvary Nov 5, 2025
af4f8a3
Revert back to OutputFile in the FormatModel
pvary Nov 8, 2025
c17c9ca
Synchronized register method, and only provide writeBuilder, location…
pvary Nov 9, 2025
7c175ea
constantValues -> idToConstant
pvary Nov 9, 2025
7ed56a5
Remove new from the API parameter names
pvary Nov 9, 2025
69461a2
Move to EncryptedOutputFile from OutputFile FormatModel.writeBuilder
pvary Nov 10, 2025
fdbb6c0
Ryan's comments
pvary Nov 11, 2025
4ae6ba0
Fix typo in comment
pvary Nov 20, 2025
fe743e1
Revert back to prevent re-registering models
pvary Nov 20, 2025
b66e891
Ryan's comments
pvary Nov 21, 2025
d450ec6
Steven's comment to change the log message for failed registration
pvary Nov 21, 2025
564ba8c
Create a marker class for the Comet reader and a few extra nits
pvary Dec 9, 2025
17f030f
Added back the ReadBuilder.outputSchema method as the attirbute is us…
pvary Jan 19, 2026
74ca9f7
Changes from #12298 based on Ryan's comments
pvary Feb 5, 2026
6cf720a
Last fixes
pvary Feb 6, 2026
7f6cb33
Javadoc comment fixes
pvary Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ project(':iceberg-data') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-common')
Comment thread
pvary marked this conversation as resolved.
Outdated
implementation project(':iceberg-core')
compileOnly project(':iceberg-parquet')
compileOnly project(':iceberg-orc')
Expand Down
109 changes: 109 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;

/**
* Interface which is implemented by the data file format implementations. The {@link ObjectModel}
* provides the {@link AppenderBuilder} for the given parameters:
*
* <ul>
* <li>file format
* <li>engine specific object model
* <li>{@link ObjectModel.WriteMode}
* </ul>
*
* The {@link AppenderBuilder} is used to write data to the target files.
*
* @param <B> type returned by builder API to allow chained calls
* @param <E> the engine specific schema of the input data
*/
public interface AppenderBuilder<B extends AppenderBuilder<B, E>, E> {
Comment thread
stevenzwu marked this conversation as resolved.
Outdated
/** Set the file schema. */
B schema(Schema newSchema);

/**
* Set a writer configuration property which affects the writer behavior.
*
* @param property a writer config property name
* @param value config value
* @return this for method chaining
*/
B set(String property, String value);

default B set(Map<String, String> properties) {
properties.forEach(this::set);
return (B) this;
}

/**
* Set a file metadata property in the created file.
*
* @param property a file metadata property name
* @param value config value
* @return this for method chaining
*/
B meta(String property, String value);

/** Sets the metrics configuration used for collecting column metrics for the created file. */
B metricsConfig(MetricsConfig newMetricsConfig);

/** Overwrite the file if it already exists. By default, overwrite is disabled. */
B overwrite();

/**
* Overwrite the file if it already exists. The default value is <code>false</code>.
*
* @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility.
Comment thread
stevenzwu marked this conversation as resolved.
Outdated
* Use {@link #overwrite()} instead.
*/
@Deprecated
B overwrite(boolean enabled);

/**
* Sets the encryption key used for writing the file. If encryption is not supported by the reader
* then an exception should be thrown.
*/
default B fileEncryptionKey(ByteBuffer encryptionKey) {
throw new UnsupportedOperationException("Not supported");
}

/**
* Sets the additional authentication data prefix used for writing the file. If encryption is not
Comment thread
pvary marked this conversation as resolved.
Outdated
* supported by the reader then an exception should be thrown.
*/
default B aadPrefix(ByteBuffer aadPrefix) {
throw new UnsupportedOperationException("Not supported");
}

/**
* Sets the engine native schema for the input. Defines the input type when there is N to 1
* mapping between the engine type and the Iceberg type, and providing the Iceberg schema is not
* enough for the conversion.
*/
B engineSchema(E newEngineSchema);
Comment thread
pvary marked this conversation as resolved.
Outdated

/** Finalizes the configuration and builds the {@link FileAppender}. */
<D> FileAppender<D> build() throws IOException;
}
106 changes: 106 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ObjectModel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import org.apache.iceberg.FileFormat;

/**
* Direct conversion is used between file formats and engine internal formats for performance
* reasons. Object models encapsulate these conversions.
*
* <p>{@link ReadBuilder} is provided for reading data files stored in a given {@link FileFormat}
* into the engine specific object model.
*
* <p>{@link AppenderBuilder} is provided for writing engine specific object model to data/delete
* files stored in a given {@link FileFormat}.
*
* <p>Iceberg supports the following object models natively:
*
* <ul>
* <li>generic - reads and writes Iceberg {@link org.apache.iceberg.data.Record}s
* <li>spark - reads and writes Spark InternalRow records
* <li>spark-vectorized - vectorized reads for Spark columnar batches. Not supported for {@link
* FileFormat#AVRO}
* <li>flink - reads and writes Flink RowData records
* <li>arrow - vectorized reads for into Arrow columnar format. Only supported for {@link
* FileFormat#PARQUET}
* </ul>
*
* <p>Engines could implement their own object models to leverage Iceberg data file reading and
* writing capabilities.
*
* @param <E> the engine specific schema of the input data for the appender
*/
public interface ObjectModel<E> {
Comment thread
stevenzwu marked this conversation as resolved.
Outdated
/** The file format which is read/written by the object model. */
FileFormat format();

/**
* The name of the object model. Allows users to specify the object model to map the data file for
* reading and writing.
*/
String name();

/**
* The appender builder for the output file which writes the data in the specified file format and
* accepts the records defined by this object model. The 'mode' parameter defines the input type
* for the specific writer use-cases. The appender should handle the following input in the
* specific modes:
*
* <ul>
* <li>The appender's engine specific input type
* <ul>
* <li>{@link WriteMode#DATA_WRITER}
* <li>{@link WriteMode#EQUALITY_DELETE_WRITER}
* </ul>
* <li>{@link org.apache.iceberg.deletes.PositionDelete} where the type of the row is the
* appender's engine specific input type when the 'mode' is {@link
* WriteMode#POSITION_DELETE_WRITER}
* </ul>
*
* @param outputFile to write to
* @param mode for the appender
* @return the appender builder
* @param <B> The type of the appender builder
*/
<B extends AppenderBuilder<B, E>> B appenderBuilder(OutputFile outputFile, WriteMode mode);
Comment thread
stevenzwu marked this conversation as resolved.
Outdated

/**
* The reader builder for the input file which reads the data from the specified file format and
* returns the records in this object model.
*
* @param inputFile to read from
* @return the reader builder
* @param <B> The type of the reader builder
*/
<B extends ReadBuilder<B>> B readBuilder(InputFile inputFile);
Comment thread
stevenzwu marked this conversation as resolved.
Outdated

/**
* Writer modes. Based on the mode the object model could alter the appender configuration when
* creating the {@link FileAppender}.
*/
enum WriteMode {
/** Mode for writing data files. */
DATA_WRITER,
/** Mode for writing equality delete files. */
EQUALITY_DELETE_WRITER,
/** Mode for writing position delete files. */
POSITION_DELETE_WRITER,
}
}
123 changes: 123 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ReadBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.mapping.NameMapping;

/**
* File formats should implement this interface to provide a builder for reading data files. {@link
* ReadBuilder} reads the data files with the specified parameters. The returned objects are defined
* by the {@link ObjectModel} which is used to read the data.
*
* <p>This interface is directly exposed for the users to parameterize readers.
*
* @param <B> type returned by builder API to allow chained calls
*/
public interface ReadBuilder<B extends ReadBuilder<B>> {
/** The configuration key for the batch size in case of vectorized reads. */
String RECORDS_PER_BATCH_KEY = "iceberg.records-per-batch";
Comment thread
pvary marked this conversation as resolved.
Outdated

/**
* Restricts the read to the given range: [start, start + length).
*
* @param newStart the start position for this read
* @param newLength the length of the range this read should scan
*/
B split(long newStart, long newLength);

/** Read only the given columns. */
Comment thread
pvary marked this conversation as resolved.
Outdated
B project(Schema newSchema);

/**
* Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary
* records. Some readers might not be able to filter some part of the expression. In this case the
* reader might return unfiltered or partially filtered rows. It is the caller's responsibility to
* apply the filter again.
*
* @param newFilter the filter to set
* @param filterCaseSensitive whether the filtering is case-sensitive or not
*/
default B filter(Expression newFilter, boolean filterCaseSensitive) {
Comment thread
pvary marked this conversation as resolved.
Outdated
// Skip filtering if not available
return (B) this;
}

/**
* Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary
* records. Some readers might not be able to filter some part of the exception. In this case the
* reader might return unfiltered or partially filtered rows. It is the caller's responsibility to
* apply the filter again. The default implementation sets the filter to be case-sensitive.
*
* @param newFilter the filter to set
*/
default B filter(Expression newFilter) {
return filter(newFilter, true);
}

/**
* Sets configuration key/value pairs for the reader. Reader builders should ignore configuration
* keys not known for them.
*/
default B set(String key, String value) {
// Skip configuration if not applicable
return (B) this;
Comment thread
pvary marked this conversation as resolved.
Outdated
}

/**
* Enables reusing the containers returned by the reader. Decreases pressure on GC. Readers could
* decide to ignore the user provided setting if is not supported by them.
*/
default B reuseContainers() {
Comment thread
pvary marked this conversation as resolved.
Outdated
// Skip container reuse configuration if not applicable
return (B) this;
Comment thread
pvary marked this conversation as resolved.
Outdated
}

/**
* Accessors for constant field values. Used for calculating values in the result which are coming
* from metadata, and not coming from the data files themselves. The keys of the map are the
* column ids, the values are the accessors generating the values.
*/
B constantFieldAccessors(Map<Integer, ?> constantFieldAccessors);

/** Sets a mapping from external schema names to Iceberg type IDs. */
B withNameMapping(NameMapping newNameMapping);

/**
* Sets the file encryption key used for reading the file. If encryption is not supported by the
* reader then an exception should be thrown.
*/
default B withFileEncryptionKey(ByteBuffer encryptionKey) {
throw new UnsupportedOperationException("Not supported");
}

/**
* Sets the additional authentication data prefix for encryption. If encryption is not supported
* by the reader then an exception should be thrown.
*/
default B withAADPrefix(ByteBuffer aadPrefix) {
Comment thread
pvary marked this conversation as resolved.
Outdated
throw new UnsupportedOperationException("Not supported");
}

/** Builds the reader. */
<D> CloseableIterable<D> build();
}
44 changes: 44 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/DataWriterBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data;

import java.io.IOException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.DataWriter;

/**
* Builder for generating a {@link DataWriter}.
*
* @param <B> type of the builder
* @param <E> engine specific schema of the input records used for appender initialization
*/
public interface DataWriterBuilder<B extends DataWriterBuilder<B, E>, E>
extends FileWriterBuilderBase<B, E> {
/**
* Creates a writer which generates a {@link org.apache.iceberg.DataFile} based on the
* configurations set. The data writer will expect inputs defined by the {@link
* #engineSchema(Object)} which should be convertible to the Iceberg schema defined by {@link
* #schema(Schema)}.
*
* @param <D> the type of data that the writer will handle
* @return a {@link DataWriter} instance configured with the specified settings
* @throws IOException if an I/O error occurs during the creation of the writer
*/
<D> DataWriter<D> dataWriter() throws IOException;
}
Loading
Loading