From 9b9d5533c4f5471d2bcb978be880bbd167caf8ea Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 23 Feb 2026 11:43:05 -0500 Subject: [PATCH 1/7] add files transform and schematransform --- .../apache/beam/sdk/io/iceberg/AddFiles.java | 279 ++++++++++++++++++ .../AddFilesSchemaTransformProvider.java | 112 +++++++ 2 files changed, 391 insertions(+) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java new file mode 100644 index 000000000000..324e5e80e049 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetUtil; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +import java.util.List; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERROR_SCHEMA; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +public class AddFiles extends SchemaTransform { + private final IcebergCatalogConfig catalogConfig; + private final String tableIdentifier; + private final Duration intervalTrigger; + private final int numFilesTrigger; + private final @Nullable String locationPrefix; + private static final int DEFAULT_FILES_TRIGGER = 100_000; + private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); + + public AddFiles( + IcebergCatalogConfig catalogConfig, + String tableIdentifier, + @Nullable String locationPrefix, + @Nullable Integer numFilesTrigger, + @Nullable Duration intervalTrigger) { + System.out.println("got catalog config: " + catalogConfig); + this.catalogConfig = catalogConfig; + this.tableIdentifier = tableIdentifier; + this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; + this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : DEFAULT_FILES_TRIGGER; + this.locationPrefix = locationPrefix; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection filePaths = input.getSinglePCollection(); + Schema inputSchema = filePaths.getSchema(); + Preconditions.checkState( + inputSchema.getFieldCount() == 1 + && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING) + && !inputSchema.getField(0).getType().getNullable(), + "Incoming Row Schema contain only one (required) field of type String."); + + PCollectionTuple dataFiles = + filePaths + .apply( + "ExtractPaths", + MapElements.into(TypeDescriptors.strings()) + .via(row -> checkStateNotNull(row.getString(0)))) + .apply(Redistribute.arbitrarily()) + .apply( + "ConvertToDataFiles", + ParDo.of(new ConvertToDataFile(catalogConfig, tableIdentifier, locationPrefix)) + .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS))); + SchemaCoder sdfSchema; + try { + sdfSchema = SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA); + PCollection snapshots = + dataFiles + .get(DATA_FILES) + .setCoder(sdfSchema) + .apply("AddStaticKey", WithKeys.of((Void) null)) + .apply( + GroupIntoBatches.ofSize(numFilesTrigger) + .withMaxBufferingDuration(intervalTrigger)) + .apply("DropKey", Values.create()) + .apply("AddFilesToIceberg", ParDo.of(new AddFilesDoFn(catalogConfig, tableIdentifier))) + .setRowSchema(SnapshotInfo.getSchema()); + + return PCollectionRowTuple.of("snapshots", snapshots); + } + + static class ConvertToDataFile extends DoFn { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + public static final TupleTag ERRORS = new TupleTag<>(); + public static final TupleTag DATA_FILES = new TupleTag<>(); + public static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("file").addStringField("error").build(); + private final @Nullable String prefix; + private static volatile @MonotonicNonNull Table sharedTable; + private boolean isPartitioned = false; + + public ConvertToDataFile( + IcebergCatalogConfig catalogConfig, String identifier, @Nullable String prefix) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.prefix = prefix; + } + + @Setup + public void start() { + if (sharedTable == null) { + synchronized (ConvertToDataFile.class) { + if (sharedTable == null) { + sharedTable = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + } + isPartitioned = sharedTable.spec().isPartitioned(); + Preconditions.checkArgument( + !isPartitioned || prefix != null, + "A location prefix must be specified when adding files to a partitioned table."); + } + + private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; + private static final String PREFIX_ERROR = "File did not start with the specified prefix"; + + @ProcessElement + public void process(@Element String filePath, MultiOutputReceiver output) { + final Table table = checkStateNotNull(sharedTable); + boolean isPartitioned = table.spec().isPartitioned(); + if (isPartitioned && !filePath.startsWith(checkStateNotNull(prefix))) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + return; + } + + InputFile inputFile = table.io().newInputFile(filePath); + FileFormat format; + try { + format = inferFormat(inputFile.location()); + } catch (UnknownFormat e) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + return; + } + Metrics metrics = getFileMetrics(inputFile, format, MetricsConfig.forTable(table)); + + String partitionPath = getPartitionPath(filePath); + DataFile df = + DataFiles.builder(table.spec()) + .withPath(filePath) + .withFormat(format) + .withMetrics(metrics) + .withFileSizeInBytes(inputFile.getLength()) + .withPartitionPath(partitionPath) + .build(); + + output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); + } + + private String getPartitionPath(String filePath) { + if (!isPartitioned) { + return ""; + } + String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); + List components = Lists.newArrayList(Splitter.on('/').split(partitionPath)); + if (!components.isEmpty()) { + components = components.subList(0, components.size() - 1); + } + return String.join("/", components); + } + } + + static class AddFilesDoFn extends DoFn, Row> { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + private @MonotonicNonNull Table table = null; + + public AddFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + } + + @Setup + public void start() { + table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + + @ProcessElement + public void process(@Element Iterable files, OutputReceiver output) { + if (!files.iterator().hasNext()) { + return; + } + Table table = checkStateNotNull(this.table); + + AppendFiles appendFiles = table.newAppend(); + for (SerializableDataFile file : files) { + DataFile df = file.createDataFile(table.specs()); + appendFiles.appendFile(df); + } + + appendFiles.commit(); + Snapshot snapshot = table.currentSnapshot(); + output.output(SnapshotInfo.fromSnapshot(snapshot).toRow()); + } + } + + @SuppressWarnings("argument") + public static Metrics getFileMetrics(InputFile file, FileFormat format, MetricsConfig config) { + switch (format) { + case PARQUET: + return ParquetUtil.fileMetrics(file, config); + case ORC: + return OrcMetrics.fromInputFile(file, config); + case AVRO: + return new Metrics(Avro.rowCount(file), null, null, null, null); + default: + throw new UnsupportedOperationException("Unsupported format: " + format); + } + } + + public static FileFormat inferFormat(String path) { + String lowerPath = path.toLowerCase(); + + if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".pqt")) { + return FileFormat.PARQUET; + } else if (lowerPath.endsWith(".orc")) { + return FileFormat.ORC; + } else if (lowerPath.endsWith(".avro")) { + return FileFormat.AVRO; + } + + throw new UnknownFormat(); + } + + static class UnknownFormat extends IllegalArgumentException {} +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java new file mode 100644 index 000000000000..b402902cba1d --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +import java.util.Map; + +import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; + +@AutoService(SchemaTransformProvider.class) +public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider { + @Override + public AddFiles from(Configuration configuration) { + @Nullable Integer frequency = configuration.getTriggeringFrequencySeconds(); + + return new AddFiles( + configuration.getIcebergCatalog(), + configuration.getTable(), + configuration.getLocationPrefix(), + configuration.getAppendBatchSize(), + frequency != null ? Duration.standardSeconds(frequency) : null); + } + + @Override + public String identifier() { + return "beam:schematransform:iceberg_add_files:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("A fully-qualified table identifier.") + public abstract String getTable(); + + @SchemaFieldDescription("Properties used to set up the Iceberg catalog.") + public abstract @Nullable Map getCatalogProperties(); + + @SchemaFieldDescription("Properties passed to the Hadoop ") + public abstract @Nullable Map getConfigProperties(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the frequency at which incoming files are appended. Defaults to 600 (10 minutes). " + + "A commit is triggered when either this or append batch size is reached.") + public abstract @Nullable Integer getTriggeringFrequencySeconds(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the desired number of appended files per commit. Defaults to 100,000 files. " + + "A commit is triggered when either this or append triggering interval is reached.") + public abstract @Nullable Integer getAppendBatchSize(); + + @SchemaFieldDescription( + "The prefix shared among all partitions. For example, a data file may have the following" + + " location:\n" + + "'file:/Users/user/Documents/tmp/namespace/table_name/data/id=13/name=beam/data_file.parquet'\n\n" + + "The provided prefix should go up until the partition information:\n" + + "'file:/Users/user/Documents/tmp/namespace/table_name/data/'.\n" + + "Required if the table is partitioned. ") + public abstract @Nullable String getLocationPrefix(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String table); + + public abstract Builder setCatalogProperties(Map catalogProperties); + + public abstract Builder setConfigProperties(Map confProperties); + + public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + + public abstract Builder setAppendBatchSize(Integer size); + + public abstract Builder setLocationPrefix(String prefix); + + public abstract Configuration build(); + } + + public IcebergCatalogConfig getIcebergCatalog() { + return IcebergCatalogConfig.builder() + .setCatalogProperties(getCatalogProperties()) + .setConfigProperties(getConfigProperties()) + .build(); + } + } +} From 98328b8e85bed28a66298c5bbdab490433ef7e2c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 24 Feb 2026 13:01:21 -0500 Subject: [PATCH 2/7] minor fixes --- .../org/apache/beam/sdk/io/iceberg/AddFiles.java | 14 ++++++++------ .../iceberg/AddFilesSchemaTransformProvider.java | 7 +++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 324e5e80e049..9ed3a52cfff6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -77,7 +78,6 @@ public AddFiles( @Nullable String locationPrefix, @Nullable Integer numFilesTrigger, @Nullable Duration intervalTrigger) { - System.out.println("got catalog config: " + catalogConfig); this.catalogConfig = catalogConfig; this.tableIdentifier = tableIdentifier; this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; @@ -91,12 +91,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { Schema inputSchema = filePaths.getSchema(); Preconditions.checkState( inputSchema.getFieldCount() == 1 - && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING) - && !inputSchema.getField(0).getType().getNullable(), - "Incoming Row Schema contain only one (required) field of type String."); + && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING), + "Incoming Row Schema must contain only one field of type String. Instead, got schema: %s", + inputSchema); PCollectionTuple dataFiles = filePaths + .apply("Filter empty paths", Filter.by(row -> row.getString(0) != null)) .apply( "ExtractPaths", MapElements.into(TypeDescriptors.strings()) @@ -112,7 +113,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } catch (Exception e) { throw new RuntimeException(e); } - dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA); + PCollection snapshots = dataFiles .get(DATA_FILES) @@ -125,7 +126,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .apply("AddFilesToIceberg", ParDo.of(new AddFilesDoFn(catalogConfig, tableIdentifier))) .setRowSchema(SnapshotInfo.getSchema()); - return PCollectionRowTuple.of("snapshots", snapshots); + return PCollectionRowTuple.of( + "snapshots", snapshots, "errors", dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA)); } static class ConvertToDataFile extends DoFn { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java index b402902cba1d..8322667ac656 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -17,8 +17,11 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; @@ -27,10 +30,6 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; -import java.util.Map; - -import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; - @AutoService(SchemaTransformProvider.class) public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider { @Override From 49936abbd61b481a553c0057ce9653cfa4a8984c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 5 Mar 2026 12:28:59 -0500 Subject: [PATCH 3/7] add tests --- .../apache/beam/sdk/io/iceberg/AddFiles.java | 83 +++-- .../AddFilesSchemaTransformProvider.java | 8 +- .../beam/sdk/io/iceberg/AddFilesTest.java | 332 ++++++++++++++++++ 3 files changed, 388 insertions(+), 35 deletions(-) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 9ed3a52cfff6..caf966d9a819 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -17,6 +17,15 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERROR_SCHEMA; +import static org.apache.beam.sdk.metrics.Metrics.counter; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; @@ -37,8 +46,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -55,22 +63,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.List; - -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERROR_SCHEMA; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; - +/** + * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with + * partition metadata and metrics, then commits them to an Iceberg {@link Table}. + */ public class AddFiles extends SchemaTransform { + private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); + private static final Counter numFilesAdded = counter(AddFiles.class, "numFilesAdded"); + private static final Counter numErrorFiles = counter(AddFiles.class, "numErrorFiles"); + private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class); + private static final int DEFAULT_FILES_TRIGGER = 100_000; private final IcebergCatalogConfig catalogConfig; private final String tableIdentifier; private final Duration intervalTrigger; private final int numFilesTrigger; private final @Nullable String locationPrefix; - private static final int DEFAULT_FILES_TRIGGER = 100_000; - private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); public AddFiles( IcebergCatalogConfig catalogConfig, @@ -87,6 +97,15 @@ public AddFiles( @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { + LOG.info( + "AddFiles configured to commit after accumulating {} files, or after {} seconds.", + numFilesTrigger, + intervalTrigger.getStandardSeconds()); + if (!Strings.isNullOrEmpty(locationPrefix)) { + LOG.info( + "AddFiles configured to build partition metadata after the prefix: '{}'", locationPrefix); + } + PCollection filePaths = input.getSinglePCollection(); Schema inputSchema = filePaths.getSchema(); Preconditions.checkState( @@ -123,7 +142,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { GroupIntoBatches.ofSize(numFilesTrigger) .withMaxBufferingDuration(intervalTrigger)) .apply("DropKey", Values.create()) - .apply("AddFilesToIceberg", ParDo.of(new AddFilesDoFn(catalogConfig, tableIdentifier))) + .apply( + "CommitFilesToIceberg", + ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier))) .setRowSchema(SnapshotInfo.getSchema()); return PCollectionRowTuple.of( @@ -138,7 +159,7 @@ static class ConvertToDataFile extends DoFn { public static final Schema ERROR_SCHEMA = Schema.builder().addStringField("file").addStringField("error").build(); private final @Nullable String prefix; - private static volatile @MonotonicNonNull Table sharedTable; + private static final Map tableCache = new ConcurrentHashMap<>(); private boolean isPartitioned = false; public ConvertToDataFile( @@ -150,30 +171,27 @@ public ConvertToDataFile( @Setup public void start() { - if (sharedTable == null) { - synchronized (ConvertToDataFile.class) { - if (sharedTable == null) { - sharedTable = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); - } - } - } - isPartitioned = sharedTable.spec().isPartitioned(); + Table table = + tableCache.computeIfAbsent( + identifier, (id) -> catalogConfig.catalog().loadTable(TableIdentifier.parse(id))); + isPartitioned = table.spec().isPartitioned(); Preconditions.checkArgument( !isPartitioned || prefix != null, "A location prefix must be specified when adding files to a partitioned table."); } private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; - private static final String PREFIX_ERROR = "File did not start with the specified prefix"; + private static final String PREFIX_ERROR = "File path did not start with the specified prefix"; @ProcessElement public void process(@Element String filePath, MultiOutputReceiver output) { - final Table table = checkStateNotNull(sharedTable); + final Table table = checkStateNotNull(tableCache.get(identifier)); boolean isPartitioned = table.spec().isPartitioned(); if (isPartitioned && !filePath.startsWith(checkStateNotNull(prefix))) { output .get(ERRORS) .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + numErrorFiles.inc(); return; } @@ -185,6 +203,7 @@ public void process(@Element String filePath, MultiOutputReceiver output) { output .get(ERRORS) .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + numErrorFiles.inc(); return; } Metrics metrics = getFileMetrics(inputFile, format, MetricsConfig.forTable(table)); @@ -207,20 +226,18 @@ private String getPartitionPath(String filePath) { return ""; } String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); - List components = Lists.newArrayList(Splitter.on('/').split(partitionPath)); - if (!components.isEmpty()) { - components = components.subList(0, components.size() - 1); - } - return String.join("/", components); + int lastSlashIndex = partitionPath.lastIndexOf('/'); + + return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) : ""; } } - static class AddFilesDoFn extends DoFn, Row> { + static class CommitFilesDoFn extends DoFn, Row> { private final IcebergCatalogConfig catalogConfig; private final String identifier; - private @MonotonicNonNull Table table = null; + private transient @MonotonicNonNull Table table = null; - public AddFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { + public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { this.catalogConfig = catalogConfig; this.identifier = identifier; } @@ -237,13 +254,17 @@ public void process(@Element Iterable files, OutputReceive } Table table = checkStateNotNull(this.table); + int numFiles = 0; AppendFiles appendFiles = table.newAppend(); for (SerializableDataFile file : files) { DataFile df = file.createDataFile(table.specs()); appendFiles.appendFile(df); + numFiles++; } appendFiles.commit(); + numFilesAdded.inc(numFiles); + Snapshot snapshot = table.currentSnapshot(); output.output(SnapshotInfo.fromSnapshot(snapshot).toRow()); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java index 8322667ac656..ed3ae5bded5a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -77,10 +77,10 @@ public static Builder builder() { @SchemaFieldDescription( "The prefix shared among all partitions. For example, a data file may have the following" - + " location:\n" - + "'file:/Users/user/Documents/tmp/namespace/table_name/data/id=13/name=beam/data_file.parquet'\n\n" - + "The provided prefix should go up until the partition information:\n" - + "'file:/Users/user/Documents/tmp/namespace/table_name/data/'.\n" + + " location:%n" + + "'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n" + + "The provided prefix should go up until the partition information:%n" + + "'gs://bucket/namespace/table/data/'.%n" + "Required if the table is partitioned. ") public abstract @Nullable String getLocationPrefix(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java new file mode 100644 index 000000000000..a2f48a64d482 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AddFilesTest { + @Rule public TemporaryFolder temp = new TemporaryFolder(); + private String root; + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private HadoopCatalog catalog; + private TableIdentifier tableId; + private final org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "age", Types.IntegerType.get())); + private final PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema).identity("age").truncate("name", 3).build(); + private final PartitionKey wrapper = new PartitionKey(spec, icebergSchema); + private IcebergCatalogConfig catalogConfig; + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule public TestName testName = new TestName(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + private static final Schema INPUT_SCHEMA = Schema.builder().addStringField("filepath").build(); + + @Before + public void setup() throws Exception { + // Root for existing data files: + root = temp.getRoot().getAbsolutePath() + "/"; + + // Set up a local Hadoop Catalog using the temporary folder + catalog = new HadoopCatalog(new Configuration(), warehouse.location); + tableId = TableIdentifier.of("default", testName.getMethodName()); + + // Create an unpartitioned table by default, and full table metrics enabled + catalog.createTable( + tableId, + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of("write.metadata.metrics.default", "full")); + + catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogProperties( + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)) + .build(); + } + + @Test + public void testPartitionedFilesAreCommitted() throws Exception { + testFilesAreCommittedToIceberg(true); + } + + @Test + public void testUnPartitionedFilesAreCommitted() throws Exception { + testFilesAreCommittedToIceberg(false); + } + + public void testFilesAreCommittedToIceberg(boolean isPartitioned) throws Exception { + if (isPartitioned) { + // recreate table with the partition spec + catalog.dropTable(tableId); + catalog.createTable( + tableId, icebergSchema, spec, ImmutableMap.of("write.metadata.metrics.default", "full")); + } + + // 1. Generate two local Parquet file. + // Include Hive-like partition path if testing partition case + String partitionPath1 = isPartitioned ? "age=20/name_trunc=Mar/" : ""; + String file1 = root + partitionPath1 + "data1.parquet"; + wrapper.wrap(record(-1, "Mar", 20)); + DataWriter writer = createWriter(file1, isPartitioned ? wrapper.copy() : null); + writer.write(record(1, "Mark", 20)); + writer.write(record(2, "Martin", 20)); + writer.close(); + + String partitionPath2 = isPartitioned ? "age=25/name_trunc=Sam/" : ""; + String file2 = root + partitionPath2 + "data2.parquet"; + wrapper.wrap(record(-1, "Sam", 25)); + DataWriter writer2 = createWriter(file2, isPartitioned ? wrapper.copy() : null); + writer2.write(record(3, "Samantha", 25)); + writer2.write(record(4, "Sammy", 25)); + writer2.close(); + + // 2. Setup the input PCollection + Row row1 = Row.withSchema(INPUT_SCHEMA).addValue(file1).build(); + Row row2 = Row.withSchema(INPUT_SCHEMA).addValue(file2).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(row1, row2).withRowSchema(INPUT_SCHEMA)); + + // 3. Apply the transform (Trigger aggressively for testing) + PCollectionRowTuple output = + PCollectionRowTuple.of("input", inputFiles) + .apply( + new AddFiles( + catalogConfig, + tableId.toString(), + isPartitioned ? root : null, + 2, // trigger at 2 files + Duration.standardSeconds(1))); + + // 4. Validate PCollection Outputs + PAssert.that(output.get("errors")).empty(); + + // 5. Run the pipeline + pipeline.run().waitUntilFinish(); + + // 6. Validate the Iceberg Table state directly + Table table = catalog.loadTable(tableId); + + // Check that we have exactly 1 snapshot with 2 files + assertEquals(1, Iterables.size(table.snapshots())); + + List addedFiles = + Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io())); + assertEquals(2, addedFiles.size()); + + // Verify file paths + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data1.parquet"))); + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data2.parquet"))); + + // check metrics metadata is preserved + DataFile writtenDf1 = writer.toDataFile(); + DataFile writtenDf2 = writer2.toDataFile(); + DataFile addedDf1 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data1.parquet")) + .collect(Collectors.toList())); + DataFile addedDf2 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data2.parquet")) + .collect(Collectors.toList())); + + assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds()); + assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds()); + assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds()); + assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds()); + + // check partition metadata is preserved + assertEquals(writtenDf1.partition(), addedDf1.partition()); + assertEquals(writtenDf2.partition(), addedDf2.partition()); + } + + @Test + public void testStreamingAdds() throws IOException { + List paths = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + String file = String.format("%sdata_%s.parquet", root, i); + DataWriter writer = createWriter(file); + writer.write(record(1, "SomeName", 30)); + writer.close(); + paths.add(Row.withSchema(INPUT_SCHEMA).addValue(file).build()); + } + + PCollection files = + pipeline.apply( + TestStream.create(INPUT_SCHEMA) + .addElements( + paths.get(0), paths.subList(1, 15).toArray(new Row[] {})) // should commit twice + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements( + paths.get(15), + paths.subList(16, 40).toArray(new Row[] {})) // should commit 3 times + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements( + paths.get(40), + paths.subList(41, 45).toArray(new Row[] {})) // should commit once + .advanceWatermarkToInfinity()); + + PCollectionRowTuple.of("input", files) + .apply( + new AddFiles( + catalogConfig, + tableId.toString(), + null, + 10, // trigger at 10 files + Duration.standardSeconds(5))); + pipeline.run().waitUntilFinish(); + + Table table = catalog.loadTable(tableId); + + List snapshots = Lists.newArrayList(table.snapshots()); + snapshots.sort(Comparator.comparingLong(Snapshot::timestampMillis)); + + assertEquals(6, snapshots.size()); + assertEquals(10, Iterables.size(snapshots.get(0).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(1).addedDataFiles(table.io()))); + assertEquals(10, Iterables.size(snapshots.get(2).addedDataFiles(table.io()))); + assertEquals(10, Iterables.size(snapshots.get(3).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(4).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(5).addedDataFiles(table.io()))); + } + + @Test + public void testUnknownFormatErrors() throws Exception { + // Create a dummy text file (unsupported extension) + File txtFile = temp.newFile("unsupported.txt"); + txtFile.createNewFile(); + + Row badRow = Row.withSchema(INPUT_SCHEMA).addValue(txtFile.getAbsolutePath()).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(badRow).withRowSchema(INPUT_SCHEMA)); + + AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null, 1, null); + PCollectionRowTuple outputTuple = PCollectionRowTuple.of("input", inputFiles).apply(addFiles); + + // Validate the file ended up in the errors PCollection with the correct schema + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ConvertToDataFile.ERROR_SCHEMA) + .addValues(txtFile.getAbsolutePath(), "Could not determine the file's format") + .build()); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testPartitionPrefixErrors() throws Exception { + // Drop unpartitioned table and create a partitioned one + catalog.dropTable(tableId); + PartitionSpec spec = PartitionSpec.builderFor(icebergSchema).identity("name").build(); + catalog.createTable(tableId, icebergSchema, spec); + + String file1 = root + "data1.parquet"; + wrapper.wrap(record(-1, "And", 30)); + DataWriter writer = createWriter(file1, wrapper.copy()); + writer.write(record(1, "Andrew", 30)); + writer.close(); + + Row row1 = Row.withSchema(INPUT_SCHEMA).addValue(file1).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(row1).withRowSchema(INPUT_SCHEMA)); + + // Notice locationPrefix is "some/prefix/" but the absolute path doesn't start with it + AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", 1, null); + PCollectionRowTuple outputTuple = PCollectionRowTuple.of("input", inputFiles).apply(addFiles); + + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ConvertToDataFile.ERROR_SCHEMA) + .addValues(file1, "File path did not start with the specified prefix") + .build()); + + pipeline.run().waitUntilFinish(); + } + + private DataWriter createWriter(String file) throws IOException { + return createWriter(file, null); + } + + private DataWriter createWriter(String file, @Nullable StructLike partition) + throws IOException { + return Parquet.writeData(Files.localOutput(file)) + .schema(icebergSchema) + .withSpec(partition != null ? spec : PartitionSpec.unpartitioned()) + .withPartition(partition) + .createWriterFunc(GenericParquetWriter::create) + .build(); + } + + private Record record(int id, String name, int age) { + return GenericRecord.create(icebergSchema).copy("id", id, "name", name, "age", age); + } +} From 3b2e86dbfcd2909ad5f1cce89945e87511b860da Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 7 Mar 2026 15:09:05 -0500 Subject: [PATCH 4/7] create table if needed; determine partition spec from metrics --- sdks/java/io/iceberg/build.gradle | 4 +- .../apache/beam/sdk/io/iceberg/AddFiles.java | 337 +++++++++++++++--- .../AddFilesSchemaTransformProvider.java | 29 +- .../beam/sdk/io/iceberg/PartitionUtils.java | 5 +- .../beam/sdk/io/iceberg/AddFilesTest.java | 164 +++++++-- .../apache/beam/sdk/io/parquet/ParquetIO.java | 4 +- 6 files changed, 458 insertions(+), 85 deletions(-) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index c188e4f63853..b185a962415f 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -51,6 +51,8 @@ dependencies { implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" implementation "org.apache.parquet:parquet-hadoop:$parquet_version" + implementation "org.apache.parquet:parquet-common:$parquet_version" + implementation project(":sdks:java:io:parquet") implementation "org.apache.orc:orc-core:$orc_version" implementation "org.apache.iceberg:iceberg-core:$iceberg_version" implementation "org.apache.iceberg:iceberg-api:$iceberg_version" @@ -74,11 +76,11 @@ dependencies { testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.parquet:parquet-avro:$parquet_version" - testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") testImplementation library.java.junit + testImplementation library.java.hamcrest // Hive catalog test dependencies testImplementation project(path: ":sdks:java:io:iceberg:hive") diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index caf966d9a819..afdc92ba186d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -19,13 +19,24 @@ import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERROR_SCHEMA; import static org.apache.beam.sdk.metrics.Metrics.counter; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.BeamParquetInputFile; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; @@ -36,8 +47,9 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Redistribute; -import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PCollectionTuple; @@ -47,19 +59,30 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.MessageType; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -76,20 +99,29 @@ public class AddFiles extends SchemaTransform { private static final Counter numErrorFiles = counter(AddFiles.class, "numErrorFiles"); private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class); private static final int DEFAULT_FILES_TRIGGER = 100_000; + static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("file").addStringField("error").build(); private final IcebergCatalogConfig catalogConfig; private final String tableIdentifier; private final Duration intervalTrigger; private final int numFilesTrigger; private final @Nullable String locationPrefix; + private final @Nullable List partitionFields; + private final @Nullable Map tableProps; + private final String jobId = UUID.randomUUID().toString(); public AddFiles( IcebergCatalogConfig catalogConfig, String tableIdentifier, @Nullable String locationPrefix, + @Nullable List partitionFields, + @Nullable Map tableProps, @Nullable Integer numFilesTrigger, @Nullable Duration intervalTrigger) { this.catalogConfig = catalogConfig; this.tableIdentifier = tableIdentifier; + this.partitionFields = partitionFields; + this.tableProps = tableProps; this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : DEFAULT_FILES_TRIGGER; this.locationPrefix = locationPrefix; @@ -106,8 +138,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { "AddFiles configured to build partition metadata after the prefix: '{}'", locationPrefix); } - PCollection filePaths = input.getSinglePCollection(); - Schema inputSchema = filePaths.getSchema(); + Schema inputSchema = input.getSinglePCollection().getSchema(); Preconditions.checkState( inputSchema.getFieldCount() == 1 && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING), @@ -115,7 +146,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { inputSchema); PCollectionTuple dataFiles = - filePaths + input + .getSinglePCollection() .apply("Filter empty paths", Filter.by(row -> row.getString(0) != null)) .apply( "ExtractPaths", @@ -124,7 +156,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .apply(Redistribute.arbitrarily()) .apply( "ConvertToDataFiles", - ParDo.of(new ConvertToDataFile(catalogConfig, tableIdentifier, locationPrefix)) + ParDo.of( + new ConvertToDataFile( + catalogConfig, + tableIdentifier, + locationPrefix, + partitionFields, + tableProps)) .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS))); SchemaCoder sdfSchema; try { @@ -141,10 +179,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .apply( GroupIntoBatches.ofSize(numFilesTrigger) .withMaxBufferingDuration(intervalTrigger)) - .apply("DropKey", Values.create()) .apply( "CommitFilesToIceberg", - ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier))) + ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier, jobId))) .setRowSchema(SnapshotInfo.getSchema()); return PCollectionRowTuple.of( @@ -156,59 +193,84 @@ static class ConvertToDataFile extends DoFn { private final String identifier; public static final TupleTag ERRORS = new TupleTag<>(); public static final TupleTag DATA_FILES = new TupleTag<>(); - public static final Schema ERROR_SCHEMA = - Schema.builder().addStringField("file").addStringField("error").build(); private final @Nullable String prefix; - private static final Map tableCache = new ConcurrentHashMap<>(); - private boolean isPartitioned = false; + private final @Nullable List partitionFields; + private final @Nullable Map tableProps; + private transient @MonotonicNonNull Table table; public ConvertToDataFile( - IcebergCatalogConfig catalogConfig, String identifier, @Nullable String prefix) { + IcebergCatalogConfig catalogConfig, + String identifier, + @Nullable String prefix, + @Nullable List partitionFields, + @Nullable Map tableProps) { this.catalogConfig = catalogConfig; this.identifier = identifier; this.prefix = prefix; + this.partitionFields = partitionFields; + this.tableProps = tableProps; } - @Setup - public void start() { - Table table = - tableCache.computeIfAbsent( - identifier, (id) -> catalogConfig.catalog().loadTable(TableIdentifier.parse(id))); - isPartitioned = table.spec().isPartitioned(); - Preconditions.checkArgument( - !isPartitioned || prefix != null, - "A location prefix must be specified when adding files to a partitioned table."); - } - - private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; private static final String PREFIX_ERROR = "File path did not start with the specified prefix"; + private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; + static final String UNKNOWN_PARTITION_ERROR = "Could not determine the file's partition"; @ProcessElement - public void process(@Element String filePath, MultiOutputReceiver output) { - final Table table = checkStateNotNull(tableCache.get(identifier)); - boolean isPartitioned = table.spec().isPartitioned(); - if (isPartitioned && !filePath.startsWith(checkStateNotNull(prefix))) { + public void process(@Element String filePath, MultiOutputReceiver output) throws IOException { + FileFormat format; + try { + format = inferFormat(filePath); + } catch (UnknownFormatException e) { output .get(ERRORS) - .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); numErrorFiles.inc(); return; } - InputFile inputFile = table.io().newInputFile(filePath); - FileFormat format; - try { - format = inferFormat(inputFile.location()); - } catch (UnknownFormat e) { + if (table == null) { + table = getOrCreateTable(getSchema(filePath, format)); + } + + // Check if the file path contains the provided prefix + if (table.spec().isPartitioned() + && !Strings.isNullOrEmpty(prefix) + && !filePath.startsWith(checkStateNotNull(prefix))) { output .get(ERRORS) - .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); numErrorFiles.inc(); return; } + + InputFile inputFile = table.io().newInputFile(filePath); + Metrics metrics = getFileMetrics(inputFile, format, MetricsConfig.forTable(table)); - String partitionPath = getPartitionPath(filePath); + // Figure out which partition this DataFile should go to + String partitionPath; + if (table.spec().isUnpartitioned()) { + partitionPath = ""; + } else if (!Strings.isNullOrEmpty(prefix)) { + // option 1: use directory structure to determine partition + // Note: we don't validate the DataFile content here + partitionPath = getPartitionFromFilePath(filePath); + } else { + try { + // option 2: examine DataFile min/max statistics to determine partition + partitionPath = getPartitionFromMetrics(metrics, inputFile); + } catch (UnknownPartitionException e) { + output + .get(ERRORS) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(filePath, UNKNOWN_PARTITION_ERROR + ": " + e.getMessage()) + .build()); + numErrorFiles.inc(); + return; + } + } + DataFile df = DataFiles.builder(table.spec()) .withPath(filePath) @@ -221,8 +283,50 @@ public void process(@Element String filePath, MultiOutputReceiver output) { output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); } - private String getPartitionPath(String filePath) { - if (!isPartitioned) { + private T transformValue(Transform transform, Type type, ByteBuffer bytes) { + S value = Conversions.fromByteBuffer(type, bytes); + return transform.bind(type).apply(value); + } + + private Table getOrCreateTable(org.apache.iceberg.Schema schema) { + PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); + try { + return tableProps == null + ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + } catch (AlreadyExistsException e) { // if table already exists, just load it + return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + + /** + * We don't have a table yet, so we don't know which FileIO to use to read these files. Instead, + * we use Beam's FileSystem utilities to read the file and extract its schema to create the + * table + */ + private static org.apache.iceberg.Schema getSchema(String filePath, FileFormat format) + throws IOException { + Preconditions.checkArgument( + format.equals(FileFormat.PARQUET), "Table creation is only supported for Parquet files."); + FileSystems.registerFileSystemsOnce(PipelineOptionsFactory.create()); + + MatchResult result = FileSystems.match(filePath); + ResourceId resourceId = Iterables.getOnlyElement(result.metadata()).resourceId(); + Compression compression = Compression.detect(checkStateNotNull(resourceId.getFilename())); + SeekableByteChannel channel = + (SeekableByteChannel) compression.readDecompressed(FileSystems.open(resourceId)); + BeamParquetInputFile file = new BeamParquetInputFile(channel); + + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + MessageType messageType = reader.getFooter().getFileMetaData().getSchema(); + return ParquetSchemaUtil.convert(messageType); + } + } + + private String getPartitionFromFilePath(String filePath) { + if (checkStateNotNull(table).spec().isUnpartitioned()) { return ""; } String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); @@ -230,43 +334,166 @@ private String getPartitionPath(String filePath) { return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) : ""; } + + /** + * Examines the min/max values of each partition column to determine the destination partition. + * + *

If the transformed min/max values are not equal for any given column, we won't be able to + * determine the partition. In this case, we output the DataFile to the DLQ, because assigning + * an incorrect partition may lead to it being hidden from some queries. + */ + private String getPartitionFromMetrics(Metrics metrics, InputFile inputFile) + throws UnknownPartitionException { + Table table = checkStateNotNull(this.table); + List fields = table.spec().fields(); + List sourceIds = + fields.stream().map(PartitionField::sourceId).collect(Collectors.toList()); + Metrics partitionMetrics; + // Check if metrics already includes partition columns (this is configured by table + // properties): + if (metrics.lowerBounds().keySet().containsAll(sourceIds) + && metrics.upperBounds().keySet().containsAll(sourceIds)) { + partitionMetrics = metrics; + } else { + // Otherwise, recollect metrics and ensure it includes all partition fields. + // Note: we don't attach these additional metrics to the DataFile because we can't assume + // that's in the user's best interest. + // Some tables are very wide and users may not want to store excessive metadata. + List sourceNames = + fields.stream() + .map(pf -> checkStateNotNull(table.schema().idToName().get(pf.sourceId()))) + .collect(Collectors.toList()); + Map configProps = + sourceNames.stream() + .collect(Collectors.toMap(s -> "write.metadata.metrics.column." + s, s -> "full")); + MetricsConfig configWithPartitionFields = MetricsConfig.fromProperties(configProps); + partitionMetrics = + getFileMetrics(inputFile, inferFormat(inputFile.location()), configWithPartitionFields); + } + + PartitionKey pk = new PartitionKey(table.spec(), table.schema()); + + for (int i = 0; i < fields.size(); i++) { + PartitionField field = fields.get(i); + Type type = table.schema().findType(field.sourceId()); + Transform transform = field.transform(); + + // Make a best effort estimate by comparing the lower and upper transformed values. + // If the transformed values are equal, assume that the DataFile's data safely + // aligns with the same partition. + // TODO(ahmedabu98): is comparing min/max safe enough? + // or should we compare ALL the records in a DF? + ByteBuffer lowerBytes = partitionMetrics.lowerBounds().get(field.sourceId()); + ByteBuffer upperBytes = partitionMetrics.upperBounds().get(field.sourceId()); + if (lowerBytes == null && upperBytes == null) { + pk.set(i, null); + continue; + } else if (lowerBytes == null || upperBytes == null) { + throw new UnknownPartitionException("Only one of the min/max was was null"); + } + Object lowerTransformedValue = transformValue(transform, type, lowerBytes); + Object upperTransformedValue = transformValue(transform, type, upperBytes); + + if (!Objects.deepEquals(lowerTransformedValue, upperTransformedValue)) { + // The DataFile contains values that align to different partitions, so we cannot + // safely determine a partition. + // If we commit the DataFile with an incorrect partition, downstream queries may + // completely ignore it (due to Iceberg's smart partition scan-planning). + // We also cannot commit the DataFile with a "null" partition, because that will + // also get skipped by most queries. + // The safe thing to do is to output the file to DLQ. + throw new UnknownPartitionException("Min and max transformed values were not equal"); + } + + pk.set(i, lowerTransformedValue); + } + + return pk.toPath(); + } } - static class CommitFilesDoFn extends DoFn, Row> { + static class CommitFilesDoFn extends DoFn>, Row> { private final IcebergCatalogConfig catalogConfig; private final String identifier; private transient @MonotonicNonNull Table table = null; + private final String jobId; + private static final String COMMIT_ID_KEY = "beam.add-files-commit-id"; - public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { + public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier, String jobId) { this.catalogConfig = catalogConfig; this.identifier = identifier; + this.jobId = jobId; } - @Setup + @StartBundle public void start() { - table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + if (table == null) { + table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } } @ProcessElement - public void process(@Element Iterable files, OutputReceiver output) { - if (!files.iterator().hasNext()) { + public void process( + @Element KV> files, + PaneInfo pane, + OutputReceiver output) { + String commitId = commitHash(files.getValue()); + Table table = checkStateNotNull(this.table); + if (shouldSkip(commitId)) { return; } - Table table = checkStateNotNull(this.table); int numFiles = 0; AppendFiles appendFiles = table.newAppend(); - for (SerializableDataFile file : files) { + for (SerializableDataFile file : files.getValue()) { DataFile df = file.createDataFile(table.specs()); appendFiles.appendFile(df); numFiles++; } - + appendFiles.set(COMMIT_ID_KEY, commitId); appendFiles.commit(); - numFilesAdded.inc(numFiles); Snapshot snapshot = table.currentSnapshot(); output.output(SnapshotInfo.fromSnapshot(snapshot).toRow()); + numFilesAdded.inc(numFiles); + } + + private String commitHash(Iterable files) { + int hash = 0; + for (SerializableDataFile file : files) { + hash = 31 * hash + file.getPath().hashCode(); + } + return String.valueOf(Math.abs(hash)); + } + + /** + * If the process call fails immediately after committing files, but before registering with the + * runner, then runner will retry committing the same batch of files, possibly leading to data + * duplication. + * + *

To mitigate, we create a unique ID per commit and store it in the snapshot summary. We + * skip the pane's batch of files if we see a snapshot with the same unique ID. + */ + private boolean shouldSkip(String commitUID) { + Table table = checkStateNotNull(this.table); + table.refresh(); + + // check the last 10 snapshots to see if it contains the commit ID + int i = 0; + @Nullable Snapshot current = table.currentSnapshot(); + while (current != null && i < 10) { + Map summary = current.summary(); + if (summary != null && commitUID.equals(summary.get(COMMIT_ID_KEY))) { + return true; // commit already happened, we should skip + } + if (current.parentId() == null) { + break; + } + current = table.snapshot(current.parentId()); + i++; + } + + return false; } } @@ -295,8 +522,14 @@ public static FileFormat inferFormat(String path) { return FileFormat.AVRO; } - throw new UnknownFormat(); + throw new UnknownFormatException(); } - static class UnknownFormat extends IllegalArgumentException {} + static class UnknownFormatException extends IllegalArgumentException {} + + static class UnknownPartitionException extends IllegalStateException { + UnknownPartitionException(String msg) { + super(msg); + } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java index ed3ae5bded5a..7e745468f969 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -21,6 +21,7 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -40,6 +41,8 @@ public AddFiles from(Configuration configuration) { configuration.getIcebergCatalog(), configuration.getTable(), configuration.getLocationPrefix(), + configuration.getPartitionFields(), + configuration.getTableProperties(), configuration.getAppendBatchSize(), frequency != null ? Duration.standardSeconds(frequency) : null); } @@ -81,9 +84,29 @@ public static Builder builder() { + "'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n" + "The provided prefix should go up until the partition information:%n" + "'gs://bucket/namespace/table/data/'.%n" - + "Required if the table is partitioned. ") + + "If not provided, will try determining each DataFile's partition from its metrics metadata.") public abstract @Nullable String getLocationPrefix(); + @SchemaFieldDescription( + "Fields used to create a partition spec that is applied when tables are created. For a field 'foo', " + + "the available partition transforms are:\n\n" + + "- `foo`\n" + + "- `truncate(foo, N)`\n" + + "- `bucket(foo, N)`\n" + + "- `hour(foo)`\n" + + "- `day(foo)`\n" + + "- `month(foo)`\n" + + "- `year(foo)`\n" + + "- `void(foo)`\n\n" + + "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.") + public abstract @Nullable List getPartitionFields(); + + @SchemaFieldDescription( + "Iceberg table properties to be set on the table when it is created.\n" + + "For more information on table properties," + + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") + public abstract @Nullable Map getTableProperties(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String table); @@ -98,6 +121,10 @@ public abstract static class Builder { public abstract Builder setLocationPrefix(String prefix); + public abstract Builder setPartitionFields(List fields); + + public abstract Builder setTableProperties(Map props); + public abstract Configuration build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java index 2b3117f8bf84..805cc0672940 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java @@ -68,10 +68,13 @@ class PartitionUtils { static PartitionSpec toPartitionSpec( @Nullable List fields, org.apache.beam.sdk.schemas.Schema beamSchema) { + return toPartitionSpec(fields, IcebergUtils.beamSchemaToIcebergSchema(beamSchema)); + } + + static PartitionSpec toPartitionSpec(@Nullable List fields, Schema schema) { if (fields == null) { return PartitionSpec.unpartitioned(); } - Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema); PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); for (String field : fields) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java index a2f48a64d482..b53ab3b943a1 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java @@ -17,14 +17,20 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; @@ -40,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; @@ -77,9 +84,11 @@ public class AddFilesTest { Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "name", Types.StringType.get()), Types.NestedField.required(3, "age", Types.IntegerType.get())); - private final PartitionSpec spec = - PartitionSpec.builderFor(icebergSchema).identity("age").truncate("name", 3).build(); + private final List partitionFields = Arrays.asList("age", "truncate(name, 3)"); + private final PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, icebergSchema); private final PartitionKey wrapper = new PartitionKey(spec, icebergSchema); + private final Map tableProps = + ImmutableMap.of("write.metadata.metrics.default", "full", "foo", "bar"); private IcebergCatalogConfig catalogConfig; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @Rule public TestName testName = new TestName(); @@ -94,17 +103,10 @@ public void setup() throws Exception { // Root for existing data files: root = temp.getRoot().getAbsolutePath() + "/"; - // Set up a local Hadoop Catalog using the temporary folder + // Set up a local Hadoop Catalog catalog = new HadoopCatalog(new Configuration(), warehouse.location); tableId = TableIdentifier.of("default", testName.getMethodName()); - // Create an unpartitioned table by default, and full table metrics enabled - catalog.createTable( - tableId, - icebergSchema, - PartitionSpec.unpartitioned(), - ImmutableMap.of("write.metadata.metrics.default", "full")); - catalogConfig = IcebergCatalogConfig.builder() .setCatalogProperties( @@ -113,23 +115,16 @@ public void setup() throws Exception { } @Test - public void testPartitionedFilesAreCommitted() throws Exception { - testFilesAreCommittedToIceberg(true); + public void testAddPartitionedFiles() throws Exception { + testAddFilesWithPartitionPath(true); } @Test - public void testUnPartitionedFilesAreCommitted() throws Exception { - testFilesAreCommittedToIceberg(false); + public void testAddUnPartitionedFiles() throws Exception { + testAddFilesWithPartitionPath(false); } - public void testFilesAreCommittedToIceberg(boolean isPartitioned) throws Exception { - if (isPartitioned) { - // recreate table with the partition spec - catalog.dropTable(tableId); - catalog.createTable( - tableId, icebergSchema, spec, ImmutableMap.of("write.metadata.metrics.default", "full")); - } - + public void testAddFilesWithPartitionPath(boolean isPartitioned) throws Exception { // 1. Generate two local Parquet file. // Include Hive-like partition path if testing partition case String partitionPath1 = isPartitioned ? "age=20/name_trunc=Mar/" : ""; @@ -162,8 +157,10 @@ public void testFilesAreCommittedToIceberg(boolean isPartitioned) throws Excepti catalogConfig, tableId.toString(), isPartitioned ? root : null, + isPartitioned ? partitionFields : null, + tableProps, 2, // trigger at 2 files - Duration.standardSeconds(1))); + Duration.standardSeconds(10))); // 4. Validate PCollection Outputs PAssert.that(output.get("errors")).empty(); @@ -171,8 +168,10 @@ public void testFilesAreCommittedToIceberg(boolean isPartitioned) throws Excepti // 5. Run the pipeline pipeline.run().waitUntilFinish(); - // 6. Validate the Iceberg Table state directly + // 6. Validate the Iceberg Table was created with the correct spec and properties Table table = catalog.loadTable(tableId); + tableProps.forEach((key, value) -> assertThat(table.properties(), hasEntry(key, value))); + assertEquals(isPartitioned ? spec : PartitionSpec.unpartitioned(), table.spec()); // Check that we have exactly 1 snapshot with 2 files assertEquals(1, Iterables.size(table.snapshots())); @@ -209,6 +208,111 @@ public void testFilesAreCommittedToIceberg(boolean isPartitioned) throws Excepti assertEquals(writtenDf2.partition(), addedDf2.partition()); } + @Test + public void testAddFilesWithPartitionFromMetrics() throws IOException { + // 1. Generate local Parquet files with no directory structure. + String file1 = root + "data1.parquet"; + DataWriter writer = createWriter(file1); + writer.write(record(1, "Mark", 20)); + writer.write(record(2, "Martin", 20)); + writer.close(); + PartitionData expectedPartition1 = new PartitionData(spec.partitionType()); + expectedPartition1.set(0, 20); + expectedPartition1.set(1, "Mar"); + + String file2 = root + "data2.parquet"; + DataWriter writer2 = createWriter(file2); + writer2.write(record(3, "Samantha", 25)); + writer2.write(record(4, "Sammy", 25)); + writer2.close(); + PartitionData expectedPartition2 = new PartitionData(spec.partitionType()); + expectedPartition2.set(0, 25); + expectedPartition2.set(1, "Sam"); + + // Also create a "bad" DataFile, containing values that correspond to different partitions + // This file should get output to the DLQ, because we cannot determine its partition + String file3 = root + "data3.parquet"; + DataWriter writer3 = createWriter(file3); + writer3.write(record(5, "Johnny", 25)); + writer3.write(record(6, "Yaseen", 32)); + writer3.close(); + + // 2. Setup the input PCollection + Row row1 = Row.withSchema(INPUT_SCHEMA).addValue(file1).build(); + Row row2 = Row.withSchema(INPUT_SCHEMA).addValue(file2).build(); + Row row3 = Row.withSchema(INPUT_SCHEMA).addValue(file3).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(row1, row2, row3).withRowSchema(INPUT_SCHEMA)); + + // 3. Apply the transform (Trigger aggressively for testing) + PCollectionRowTuple output = + PCollectionRowTuple.of("input", inputFiles) + .apply( + new AddFiles( + catalogConfig, + tableId.toString(), + null, // no prefix, so determine partition from DF metrics + partitionFields, + tableProps, + 2, // trigger at 2 files + Duration.standardSeconds(10))); + + // 4. There should be an error for File3, because its partition could not be determined + PAssert.that(output.get("errors")) + .satisfies( + errorRows -> { + Row errorRow = Iterables.getOnlyElement(errorRows); + checkState( + errorRow.getSchema().equals(AddFiles.ERROR_SCHEMA) + && file3.equals(errorRow.getString(0)) + && checkStateNotNull(errorRow.getString(1)) + .startsWith(AddFiles.ConvertToDataFile.UNKNOWN_PARTITION_ERROR)); + return null; + }); + + // 5. Run the pipeline + pipeline.run().waitUntilFinish(); + + // 6. Validate the Iceberg Table was created with the correct spec and properties + Table table = catalog.loadTable(tableId); + tableProps.forEach((key, value) -> assertThat(table.properties(), hasEntry(key, value))); + assertEquals(spec, table.spec()); + + // Check that we have exactly 1 snapshot with 2 files + assertEquals(1, Iterables.size(table.snapshots())); + + List addedFiles = + Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io())); + assertEquals(2, addedFiles.size()); + + // Verify file paths + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data1.parquet"))); + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data2.parquet"))); + + // check metrics metadata is preserved + DataFile writtenDf1 = writer.toDataFile(); + DataFile writtenDf2 = writer2.toDataFile(); + DataFile addedDf1 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data1.parquet")) + .collect(Collectors.toList())); + DataFile addedDf2 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data2.parquet")) + .collect(Collectors.toList())); + + assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds()); + assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds()); + assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds()); + assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds()); + + // check partition metadata is preserved + assertEquals(expectedPartition1, addedDf1.partition()); + assertEquals(expectedPartition2, addedDf2.partition()); + } + @Test public void testStreamingAdds() throws IOException { List paths = new ArrayList<>(); @@ -241,6 +345,8 @@ public void testStreamingAdds() throws IOException { catalogConfig, tableId.toString(), null, + null, + null, 10, // trigger at 10 files Duration.standardSeconds(5))); pipeline.run().waitUntilFinish(); @@ -261,6 +367,7 @@ public void testStreamingAdds() throws IOException { @Test public void testUnknownFormatErrors() throws Exception { + catalog.createTable(tableId, icebergSchema); // Create a dummy text file (unsupported extension) File txtFile = temp.newFile("unsupported.txt"); txtFile.createNewFile(); @@ -269,13 +376,13 @@ public void testUnknownFormatErrors() throws Exception { PCollection inputFiles = pipeline.apply("Create Input", Create.of(badRow).withRowSchema(INPUT_SCHEMA)); - AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null, 1, null); + AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null, null, null, 1, null); PCollectionRowTuple outputTuple = PCollectionRowTuple.of("input", inputFiles).apply(addFiles); // Validate the file ended up in the errors PCollection with the correct schema PAssert.that(outputTuple.get("errors")) .containsInAnyOrder( - Row.withSchema(AddFiles.ConvertToDataFile.ERROR_SCHEMA) + Row.withSchema(AddFiles.ERROR_SCHEMA) .addValues(txtFile.getAbsolutePath(), "Could not determine the file's format") .build()); @@ -300,12 +407,13 @@ public void testPartitionPrefixErrors() throws Exception { pipeline.apply("Create Input", Create.of(row1).withRowSchema(INPUT_SCHEMA)); // Notice locationPrefix is "some/prefix/" but the absolute path doesn't start with it - AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", 1, null); + AddFiles addFiles = + new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", null, null, 1, null); PCollectionRowTuple outputTuple = PCollectionRowTuple.of("input", inputFiles).apply(addFiles); PAssert.that(outputTuple.get("errors")) .containsInAnyOrder( - Row.withSchema(AddFiles.ConvertToDataFile.ERROR_SCHEMA) + Row.withSchema(AddFiles.ERROR_SCHEMA) .addValues(file1, "File path did not start with the specified prefix") .build()); diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 24c18f382817..e175f646d599 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -970,10 +970,10 @@ public Progress getProgress() { } } - private static class BeamParquetInputFile implements InputFile { + public static class BeamParquetInputFile implements InputFile { private final SeekableByteChannel seekableByteChannel; - BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { + public BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { this.seekableByteChannel = seekableByteChannel; } From f507d0acdaefd10515fccbc66ba2140186d2b9f0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 7 Mar 2026 15:43:28 -0500 Subject: [PATCH 5/7] spotless --- sdks/java/io/iceberg/build.gradle | 1 - .../apache/beam/sdk/io/iceberg/AddFiles.java | 32 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index b185a962415f..9cc8a295c7df 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -51,7 +51,6 @@ dependencies { implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" implementation "org.apache.parquet:parquet-hadoop:$parquet_version" - implementation "org.apache.parquet:parquet-common:$parquet_version" implementation project(":sdks:java:io:parquet") implementation "org.apache.orc:orc-core:$orc_version" implementation "org.apache.iceberg:iceberg-core:$iceberg_version" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index afdc92ba186d..77b72307b3ee 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -17,19 +17,6 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; -import static org.apache.beam.sdk.metrics.Metrics.counter; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.stream.Collectors; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult; @@ -89,6 +76,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.metrics.Metrics.counter; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + /** * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with * partition metadata and metrics, then commits them to an Iceberg {@link Table}. @@ -283,9 +284,8 @@ public void process(@Element String filePath, MultiOutputReceiver output) throws output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); } - private T transformValue(Transform transform, Type type, ByteBuffer bytes) { - S value = Conversions.fromByteBuffer(type, bytes); - return transform.bind(type).apply(value); + private T transformValue(Transform transform, Type type, ByteBuffer bytes) { + return transform.bind(type).apply(Conversions.fromByteBuffer(type, bytes)); } private Table getOrCreateTable(org.apache.iceberg.Schema schema) { From 904599690c74caa2fca0c490a384ac0025b30e9e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Mar 2026 11:23:38 -0400 Subject: [PATCH 6/7] add batch route as well --- .../apache/beam/sdk/io/iceberg/AddFiles.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 77b72307b3ee..aa2619d8b8fb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -89,6 +90,7 @@ import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; import static org.apache.beam.sdk.metrics.Metrics.counter; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; /** * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with @@ -172,14 +174,21 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { throw new RuntimeException(e); } - PCollection snapshots = + PCollection> keyedFiles = dataFiles .get(DATA_FILES) .setCoder(sdfSchema) - .apply("AddStaticKey", WithKeys.of((Void) null)) - .apply( + .apply("AddStaticKey", WithKeys.of((Void) null)); + + PCollection>> groupedFiles = + keyedFiles.isBounded().equals(BOUNDED) + ? keyedFiles.apply(GroupByKey.create()) + : keyedFiles.apply( GroupIntoBatches.ofSize(numFilesTrigger) - .withMaxBufferingDuration(intervalTrigger)) + .withMaxBufferingDuration(intervalTrigger)); + + PCollection snapshots = + groupedFiles .apply( "CommitFilesToIceberg", ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier, jobId))) From d4a0db1c27d8e54c12d5adaede453e72e98fe34b Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Mar 2026 11:34:29 -0400 Subject: [PATCH 7/7] spotless --- .../apache/beam/sdk/io/iceberg/AddFiles.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index aa2619d8b8fb..3919de759d2e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -17,6 +17,20 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.metrics.Metrics.counter; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult; @@ -77,21 +91,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.stream.Collectors; - -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; -import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; -import static org.apache.beam.sdk.metrics.Metrics.counter; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; - /** * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with * partition metadata and metrics, then commits them to an Iceberg {@link Table}.