diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index c188e4f63853..9cc8a295c7df 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -51,6 +51,7 @@ dependencies { implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" implementation "org.apache.parquet:parquet-hadoop:$parquet_version" + implementation project(":sdks:java:io:parquet") implementation "org.apache.orc:orc-core:$orc_version" implementation "org.apache.iceberg:iceberg-core:$iceberg_version" implementation "org.apache.iceberg:iceberg-api:$iceberg_version" @@ -74,11 +75,11 @@ dependencies { testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.parquet:parquet-avro:$parquet_version" - testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") testImplementation library.java.junit + testImplementation library.java.hamcrest // Hive catalog test dependencies testImplementation project(path: ":sdks:java:io:iceberg:hive") diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java new file mode 100644 index 000000000000..3919de759d2e --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.metrics.Metrics.counter; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.BeamParquetInputFile; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.MessageType; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with + * partition metadata and metrics, then commits them to an Iceberg {@link Table}. + */ +public class AddFiles extends SchemaTransform { + private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); + private static final Counter numFilesAdded = counter(AddFiles.class, "numFilesAdded"); + private static final Counter numErrorFiles = counter(AddFiles.class, "numErrorFiles"); + private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class); + private static final int DEFAULT_FILES_TRIGGER = 100_000; + static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("file").addStringField("error").build(); + private final IcebergCatalogConfig catalogConfig; + private final String tableIdentifier; + private final Duration intervalTrigger; + private final int numFilesTrigger; + private final @Nullable String locationPrefix; + private final @Nullable List partitionFields; + private final @Nullable Map tableProps; + private final String jobId = UUID.randomUUID().toString(); + + public AddFiles( + IcebergCatalogConfig catalogConfig, + String tableIdentifier, + @Nullable String locationPrefix, + @Nullable List partitionFields, + @Nullable Map tableProps, + @Nullable Integer numFilesTrigger, + @Nullable Duration intervalTrigger) { + this.catalogConfig = catalogConfig; + this.tableIdentifier = tableIdentifier; + this.partitionFields = partitionFields; + this.tableProps = tableProps; + this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; + this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : DEFAULT_FILES_TRIGGER; + this.locationPrefix = locationPrefix; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + LOG.info( + "AddFiles configured to commit after accumulating {} files, or after {} seconds.", + numFilesTrigger, + intervalTrigger.getStandardSeconds()); + if (!Strings.isNullOrEmpty(locationPrefix)) { + LOG.info( + "AddFiles configured to build partition metadata after the prefix: '{}'", locationPrefix); + } + + Schema inputSchema = input.getSinglePCollection().getSchema(); + Preconditions.checkState( + inputSchema.getFieldCount() == 1 + && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING), + "Incoming Row Schema must contain only one field of type String. Instead, got schema: %s", + inputSchema); + + PCollectionTuple dataFiles = + input + .getSinglePCollection() + .apply("Filter empty paths", Filter.by(row -> row.getString(0) != null)) + .apply( + "ExtractPaths", + MapElements.into(TypeDescriptors.strings()) + .via(row -> checkStateNotNull(row.getString(0)))) + .apply(Redistribute.arbitrarily()) + .apply( + "ConvertToDataFiles", + ParDo.of( + new ConvertToDataFile( + catalogConfig, + tableIdentifier, + locationPrefix, + partitionFields, + tableProps)) + .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS))); + SchemaCoder sdfSchema; + try { + sdfSchema = SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + + PCollection> keyedFiles = + dataFiles + .get(DATA_FILES) + .setCoder(sdfSchema) + .apply("AddStaticKey", WithKeys.of((Void) null)); + + PCollection>> groupedFiles = + keyedFiles.isBounded().equals(BOUNDED) + ? keyedFiles.apply(GroupByKey.create()) + : keyedFiles.apply( + GroupIntoBatches.ofSize(numFilesTrigger) + .withMaxBufferingDuration(intervalTrigger)); + + PCollection snapshots = + groupedFiles + .apply( + "CommitFilesToIceberg", + ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier, jobId))) + .setRowSchema(SnapshotInfo.getSchema()); + + return PCollectionRowTuple.of( + "snapshots", snapshots, "errors", dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA)); + } + + static class ConvertToDataFile extends DoFn { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + public static final TupleTag ERRORS = new TupleTag<>(); + public static final TupleTag DATA_FILES = new TupleTag<>(); + private final @Nullable String prefix; + private final @Nullable List partitionFields; + private final @Nullable Map tableProps; + private transient @MonotonicNonNull Table table; + + public ConvertToDataFile( + IcebergCatalogConfig catalogConfig, + String identifier, + @Nullable String prefix, + @Nullable List partitionFields, + @Nullable Map tableProps) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.prefix = prefix; + this.partitionFields = partitionFields; + this.tableProps = tableProps; + } + + private static final String PREFIX_ERROR = "File path did not start with the specified prefix"; + private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; + static final String UNKNOWN_PARTITION_ERROR = "Could not determine the file's partition"; + + @ProcessElement + public void process(@Element String filePath, MultiOutputReceiver output) throws IOException { + FileFormat format; + try { + format = inferFormat(filePath); + } catch (UnknownFormatException e) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + numErrorFiles.inc(); + return; + } + + if (table == null) { + table = getOrCreateTable(getSchema(filePath, format)); + } + + // Check if the file path contains the provided prefix + if (table.spec().isPartitioned() + && !Strings.isNullOrEmpty(prefix) + && !filePath.startsWith(checkStateNotNull(prefix))) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + numErrorFiles.inc(); + return; + } + + InputFile inputFile = table.io().newInputFile(filePath); + + Metrics metrics = getFileMetrics(inputFile, format, MetricsConfig.forTable(table)); + + // Figure out which partition this DataFile should go to + String partitionPath; + if (table.spec().isUnpartitioned()) { + partitionPath = ""; + } else if (!Strings.isNullOrEmpty(prefix)) { + // option 1: use directory structure to determine partition + // Note: we don't validate the DataFile content here + partitionPath = getPartitionFromFilePath(filePath); + } else { + try { + // option 2: examine DataFile min/max statistics to determine partition + partitionPath = getPartitionFromMetrics(metrics, inputFile); + } catch (UnknownPartitionException e) { + output + .get(ERRORS) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(filePath, UNKNOWN_PARTITION_ERROR + ": " + e.getMessage()) + .build()); + numErrorFiles.inc(); + return; + } + } + + DataFile df = + DataFiles.builder(table.spec()) + .withPath(filePath) + .withFormat(format) + .withMetrics(metrics) + .withFileSizeInBytes(inputFile.getLength()) + .withPartitionPath(partitionPath) + .build(); + + output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); + } + + private T transformValue(Transform transform, Type type, ByteBuffer bytes) { + return transform.bind(type).apply(Conversions.fromByteBuffer(type, bytes)); + } + + private Table getOrCreateTable(org.apache.iceberg.Schema schema) { + PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); + try { + return tableProps == null + ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + } catch (AlreadyExistsException e) { // if table already exists, just load it + return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + + /** + * We don't have a table yet, so we don't know which FileIO to use to read these files. Instead, + * we use Beam's FileSystem utilities to read the file and extract its schema to create the + * table + */ + private static org.apache.iceberg.Schema getSchema(String filePath, FileFormat format) + throws IOException { + Preconditions.checkArgument( + format.equals(FileFormat.PARQUET), "Table creation is only supported for Parquet files."); + FileSystems.registerFileSystemsOnce(PipelineOptionsFactory.create()); + + MatchResult result = FileSystems.match(filePath); + ResourceId resourceId = Iterables.getOnlyElement(result.metadata()).resourceId(); + Compression compression = Compression.detect(checkStateNotNull(resourceId.getFilename())); + SeekableByteChannel channel = + (SeekableByteChannel) compression.readDecompressed(FileSystems.open(resourceId)); + BeamParquetInputFile file = new BeamParquetInputFile(channel); + + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + MessageType messageType = reader.getFooter().getFileMetaData().getSchema(); + return ParquetSchemaUtil.convert(messageType); + } + } + + private String getPartitionFromFilePath(String filePath) { + if (checkStateNotNull(table).spec().isUnpartitioned()) { + return ""; + } + String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); + int lastSlashIndex = partitionPath.lastIndexOf('/'); + + return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) : ""; + } + + /** + * Examines the min/max values of each partition column to determine the destination partition. + * + *

If the transformed min/max values are not equal for any given column, we won't be able to + * determine the partition. In this case, we output the DataFile to the DLQ, because assigning + * an incorrect partition may lead to it being hidden from some queries. + */ + private String getPartitionFromMetrics(Metrics metrics, InputFile inputFile) + throws UnknownPartitionException { + Table table = checkStateNotNull(this.table); + List fields = table.spec().fields(); + List sourceIds = + fields.stream().map(PartitionField::sourceId).collect(Collectors.toList()); + Metrics partitionMetrics; + // Check if metrics already includes partition columns (this is configured by table + // properties): + if (metrics.lowerBounds().keySet().containsAll(sourceIds) + && metrics.upperBounds().keySet().containsAll(sourceIds)) { + partitionMetrics = metrics; + } else { + // Otherwise, recollect metrics and ensure it includes all partition fields. + // Note: we don't attach these additional metrics to the DataFile because we can't assume + // that's in the user's best interest. + // Some tables are very wide and users may not want to store excessive metadata. + List sourceNames = + fields.stream() + .map(pf -> checkStateNotNull(table.schema().idToName().get(pf.sourceId()))) + .collect(Collectors.toList()); + Map configProps = + sourceNames.stream() + .collect(Collectors.toMap(s -> "write.metadata.metrics.column." + s, s -> "full")); + MetricsConfig configWithPartitionFields = MetricsConfig.fromProperties(configProps); + partitionMetrics = + getFileMetrics(inputFile, inferFormat(inputFile.location()), configWithPartitionFields); + } + + PartitionKey pk = new PartitionKey(table.spec(), table.schema()); + + for (int i = 0; i < fields.size(); i++) { + PartitionField field = fields.get(i); + Type type = table.schema().findType(field.sourceId()); + Transform transform = field.transform(); + + // Make a best effort estimate by comparing the lower and upper transformed values. + // If the transformed values are equal, assume that the DataFile's data safely + // aligns with the same partition. + // TODO(ahmedabu98): is comparing min/max safe enough? + // or should we compare ALL the records in a DF? + ByteBuffer lowerBytes = partitionMetrics.lowerBounds().get(field.sourceId()); + ByteBuffer upperBytes = partitionMetrics.upperBounds().get(field.sourceId()); + if (lowerBytes == null && upperBytes == null) { + pk.set(i, null); + continue; + } else if (lowerBytes == null || upperBytes == null) { + throw new UnknownPartitionException("Only one of the min/max was was null"); + } + Object lowerTransformedValue = transformValue(transform, type, lowerBytes); + Object upperTransformedValue = transformValue(transform, type, upperBytes); + + if (!Objects.deepEquals(lowerTransformedValue, upperTransformedValue)) { + // The DataFile contains values that align to different partitions, so we cannot + // safely determine a partition. + // If we commit the DataFile with an incorrect partition, downstream queries may + // completely ignore it (due to Iceberg's smart partition scan-planning). + // We also cannot commit the DataFile with a "null" partition, because that will + // also get skipped by most queries. + // The safe thing to do is to output the file to DLQ. + throw new UnknownPartitionException("Min and max transformed values were not equal"); + } + + pk.set(i, lowerTransformedValue); + } + + return pk.toPath(); + } + } + + static class CommitFilesDoFn extends DoFn>, Row> { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + private transient @MonotonicNonNull Table table = null; + private final String jobId; + private static final String COMMIT_ID_KEY = "beam.add-files-commit-id"; + + public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier, String jobId) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.jobId = jobId; + } + + @StartBundle + public void start() { + if (table == null) { + table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + + @ProcessElement + public void process( + @Element KV> files, + PaneInfo pane, + OutputReceiver output) { + String commitId = commitHash(files.getValue()); + Table table = checkStateNotNull(this.table); + if (shouldSkip(commitId)) { + return; + } + + int numFiles = 0; + AppendFiles appendFiles = table.newAppend(); + for (SerializableDataFile file : files.getValue()) { + DataFile df = file.createDataFile(table.specs()); + appendFiles.appendFile(df); + numFiles++; + } + appendFiles.set(COMMIT_ID_KEY, commitId); + appendFiles.commit(); + + Snapshot snapshot = table.currentSnapshot(); + output.output(SnapshotInfo.fromSnapshot(snapshot).toRow()); + numFilesAdded.inc(numFiles); + } + + private String commitHash(Iterable files) { + int hash = 0; + for (SerializableDataFile file : files) { + hash = 31 * hash + file.getPath().hashCode(); + } + return String.valueOf(Math.abs(hash)); + } + + /** + * If the process call fails immediately after committing files, but before registering with the + * runner, then runner will retry committing the same batch of files, possibly leading to data + * duplication. + * + *

To mitigate, we create a unique ID per commit and store it in the snapshot summary. We + * skip the pane's batch of files if we see a snapshot with the same unique ID. + */ + private boolean shouldSkip(String commitUID) { + Table table = checkStateNotNull(this.table); + table.refresh(); + + // check the last 10 snapshots to see if it contains the commit ID + int i = 0; + @Nullable Snapshot current = table.currentSnapshot(); + while (current != null && i < 10) { + Map summary = current.summary(); + if (summary != null && commitUID.equals(summary.get(COMMIT_ID_KEY))) { + return true; // commit already happened, we should skip + } + if (current.parentId() == null) { + break; + } + current = table.snapshot(current.parentId()); + i++; + } + + return false; + } + } + + @SuppressWarnings("argument") + public static Metrics getFileMetrics(InputFile file, FileFormat format, MetricsConfig config) { + switch (format) { + case PARQUET: + return ParquetUtil.fileMetrics(file, config); + case ORC: + return OrcMetrics.fromInputFile(file, config); + case AVRO: + return new Metrics(Avro.rowCount(file), null, null, null, null); + default: + throw new UnsupportedOperationException("Unsupported format: " + format); + } + } + + public static FileFormat inferFormat(String path) { + String lowerPath = path.toLowerCase(); + + if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".pqt")) { + return FileFormat.PARQUET; + } else if (lowerPath.endsWith(".orc")) { + return FileFormat.ORC; + } else if (lowerPath.endsWith(".avro")) { + return FileFormat.AVRO; + } + + throw new UnknownFormatException(); + } + + static class UnknownFormatException extends IllegalArgumentException {} + + static class UnknownPartitionException extends IllegalStateException { + UnknownPartitionException(String msg) { + super(msg); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java new file mode 100644 index 000000000000..7e745468f969 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +@AutoService(SchemaTransformProvider.class) +public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider { + @Override + public AddFiles from(Configuration configuration) { + @Nullable Integer frequency = configuration.getTriggeringFrequencySeconds(); + + return new AddFiles( + configuration.getIcebergCatalog(), + configuration.getTable(), + configuration.getLocationPrefix(), + configuration.getPartitionFields(), + configuration.getTableProperties(), + configuration.getAppendBatchSize(), + frequency != null ? Duration.standardSeconds(frequency) : null); + } + + @Override + public String identifier() { + return "beam:schematransform:iceberg_add_files:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("A fully-qualified table identifier.") + public abstract String getTable(); + + @SchemaFieldDescription("Properties used to set up the Iceberg catalog.") + public abstract @Nullable Map getCatalogProperties(); + + @SchemaFieldDescription("Properties passed to the Hadoop ") + public abstract @Nullable Map getConfigProperties(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the frequency at which incoming files are appended. Defaults to 600 (10 minutes). " + + "A commit is triggered when either this or append batch size is reached.") + public abstract @Nullable Integer getTriggeringFrequencySeconds(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the desired number of appended files per commit. Defaults to 100,000 files. " + + "A commit is triggered when either this or append triggering interval is reached.") + public abstract @Nullable Integer getAppendBatchSize(); + + @SchemaFieldDescription( + "The prefix shared among all partitions. For example, a data file may have the following" + + " location:%n" + + "'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n" + + "The provided prefix should go up until the partition information:%n" + + "'gs://bucket/namespace/table/data/'.%n" + + "If not provided, will try determining each DataFile's partition from its metrics metadata.") + public abstract @Nullable String getLocationPrefix(); + + @SchemaFieldDescription( + "Fields used to create a partition spec that is applied when tables are created. For a field 'foo', " + + "the available partition transforms are:\n\n" + + "- `foo`\n" + + "- `truncate(foo, N)`\n" + + "- `bucket(foo, N)`\n" + + "- `hour(foo)`\n" + + "- `day(foo)`\n" + + "- `month(foo)`\n" + + "- `year(foo)`\n" + + "- `void(foo)`\n\n" + + "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.") + public abstract @Nullable List getPartitionFields(); + + @SchemaFieldDescription( + "Iceberg table properties to be set on the table when it is created.\n" + + "For more information on table properties," + + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") + public abstract @Nullable Map getTableProperties(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String table); + + public abstract Builder setCatalogProperties(Map catalogProperties); + + public abstract Builder setConfigProperties(Map confProperties); + + public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + + public abstract Builder setAppendBatchSize(Integer size); + + public abstract Builder setLocationPrefix(String prefix); + + public abstract Builder setPartitionFields(List fields); + + public abstract Builder setTableProperties(Map props); + + public abstract Configuration build(); + } + + public IcebergCatalogConfig getIcebergCatalog() { + return IcebergCatalogConfig.builder() + .setCatalogProperties(getCatalogProperties()) + .setConfigProperties(getConfigProperties()) + .build(); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java index 2b3117f8bf84..805cc0672940 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java @@ -68,10 +68,13 @@ class PartitionUtils { static PartitionSpec toPartitionSpec( @Nullable List fields, org.apache.beam.sdk.schemas.Schema beamSchema) { + return toPartitionSpec(fields, IcebergUtils.beamSchemaToIcebergSchema(beamSchema)); + } + + static PartitionSpec toPartitionSpec(@Nullable List fields, Schema schema) { if (fields == null) { return PartitionSpec.unpartitioned(); } - Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema); PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); for (String field : fields) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java new file mode 100644 index 000000000000..b53ab3b943a1 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AddFilesTest { + @Rule public TemporaryFolder temp = new TemporaryFolder(); + private String root; + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private HadoopCatalog catalog; + private TableIdentifier tableId; + private final org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "age", Types.IntegerType.get())); + private final List partitionFields = Arrays.asList("age", "truncate(name, 3)"); + private final PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, icebergSchema); + private final PartitionKey wrapper = new PartitionKey(spec, icebergSchema); + private final Map tableProps = + ImmutableMap.of("write.metadata.metrics.default", "full", "foo", "bar"); + private IcebergCatalogConfig catalogConfig; + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule public TestName testName = new TestName(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + private static final Schema INPUT_SCHEMA = Schema.builder().addStringField("filepath").build(); + + @Before + public void setup() throws Exception { + // Root for existing data files: + root = temp.getRoot().getAbsolutePath() + "/"; + + // Set up a local Hadoop Catalog + catalog = new HadoopCatalog(new Configuration(), warehouse.location); + tableId = TableIdentifier.of("default", testName.getMethodName()); + + catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogProperties( + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)) + .build(); + } + + @Test + public void testAddPartitionedFiles() throws Exception { + testAddFilesWithPartitionPath(true); + } + + @Test + public void testAddUnPartitionedFiles() throws Exception { + testAddFilesWithPartitionPath(false); + } + + public void testAddFilesWithPartitionPath(boolean isPartitioned) throws Exception { + // 1. Generate two local Parquet file. + // Include Hive-like partition path if testing partition case + String partitionPath1 = isPartitioned ? "age=20/name_trunc=Mar/" : ""; + String file1 = root + partitionPath1 + "data1.parquet"; + wrapper.wrap(record(-1, "Mar", 20)); + DataWriter writer = createWriter(file1, isPartitioned ? wrapper.copy() : null); + writer.write(record(1, "Mark", 20)); + writer.write(record(2, "Martin", 20)); + writer.close(); + + String partitionPath2 = isPartitioned ? "age=25/name_trunc=Sam/" : ""; + String file2 = root + partitionPath2 + "data2.parquet"; + wrapper.wrap(record(-1, "Sam", 25)); + DataWriter writer2 = createWriter(file2, isPartitioned ? wrapper.copy() : null); + writer2.write(record(3, "Samantha", 25)); + writer2.write(record(4, "Sammy", 25)); + writer2.close(); + + // 2. Setup the input PCollection + Row row1 = Row.withSchema(INPUT_SCHEMA).addValue(file1).build(); + Row row2 = Row.withSchema(INPUT_SCHEMA).addValue(file2).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(row1, row2).withRowSchema(INPUT_SCHEMA)); + + // 3. Apply the transform (Trigger aggressively for testing) + PCollectionRowTuple output = + PCollectionRowTuple.of("input", inputFiles) + .apply( + new AddFiles( + catalogConfig, + tableId.toString(), + isPartitioned ? root : null, + isPartitioned ? partitionFields : null, + tableProps, + 2, // trigger at 2 files + Duration.standardSeconds(10))); + + // 4. Validate PCollection Outputs + PAssert.that(output.get("errors")).empty(); + + // 5. Run the pipeline + pipeline.run().waitUntilFinish(); + + // 6. Validate the Iceberg Table was created with the correct spec and properties + Table table = catalog.loadTable(tableId); + tableProps.forEach((key, value) -> assertThat(table.properties(), hasEntry(key, value))); + assertEquals(isPartitioned ? spec : PartitionSpec.unpartitioned(), table.spec()); + + // Check that we have exactly 1 snapshot with 2 files + assertEquals(1, Iterables.size(table.snapshots())); + + List addedFiles = + Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io())); + assertEquals(2, addedFiles.size()); + + // Verify file paths + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data1.parquet"))); + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data2.parquet"))); + + // check metrics metadata is preserved + DataFile writtenDf1 = writer.toDataFile(); + DataFile writtenDf2 = writer2.toDataFile(); + DataFile addedDf1 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data1.parquet")) + .collect(Collectors.toList())); + DataFile addedDf2 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data2.parquet")) + .collect(Collectors.toList())); + + assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds()); + assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds()); + assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds()); + assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds()); + + // check partition metadata is preserved + assertEquals(writtenDf1.partition(), addedDf1.partition()); + assertEquals(writtenDf2.partition(), addedDf2.partition()); + } + + @Test + public void testAddFilesWithPartitionFromMetrics() throws IOException { + // 1. Generate local Parquet files with no directory structure. + String file1 = root + "data1.parquet"; + DataWriter writer = createWriter(file1); + writer.write(record(1, "Mark", 20)); + writer.write(record(2, "Martin", 20)); + writer.close(); + PartitionData expectedPartition1 = new PartitionData(spec.partitionType()); + expectedPartition1.set(0, 20); + expectedPartition1.set(1, "Mar"); + + String file2 = root + "data2.parquet"; + DataWriter writer2 = createWriter(file2); + writer2.write(record(3, "Samantha", 25)); + writer2.write(record(4, "Sammy", 25)); + writer2.close(); + PartitionData expectedPartition2 = new PartitionData(spec.partitionType()); + expectedPartition2.set(0, 25); + expectedPartition2.set(1, "Sam"); + + // Also create a "bad" DataFile, containing values that correspond to different partitions + // This file should get output to the DLQ, because we cannot determine its partition + String file3 = root + "data3.parquet"; + DataWriter writer3 = createWriter(file3); + writer3.write(record(5, "Johnny", 25)); + writer3.write(record(6, "Yaseen", 32)); + writer3.close(); + + // 2. Setup the input PCollection + Row row1 = Row.withSchema(INPUT_SCHEMA).addValue(file1).build(); + Row row2 = Row.withSchema(INPUT_SCHEMA).addValue(file2).build(); + Row row3 = Row.withSchema(INPUT_SCHEMA).addValue(file3).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(row1, row2, row3).withRowSchema(INPUT_SCHEMA)); + + // 3. Apply the transform (Trigger aggressively for testing) + PCollectionRowTuple output = + PCollectionRowTuple.of("input", inputFiles) + .apply( + new AddFiles( + catalogConfig, + tableId.toString(), + null, // no prefix, so determine partition from DF metrics + partitionFields, + tableProps, + 2, // trigger at 2 files + Duration.standardSeconds(10))); + + // 4. There should be an error for File3, because its partition could not be determined + PAssert.that(output.get("errors")) + .satisfies( + errorRows -> { + Row errorRow = Iterables.getOnlyElement(errorRows); + checkState( + errorRow.getSchema().equals(AddFiles.ERROR_SCHEMA) + && file3.equals(errorRow.getString(0)) + && checkStateNotNull(errorRow.getString(1)) + .startsWith(AddFiles.ConvertToDataFile.UNKNOWN_PARTITION_ERROR)); + return null; + }); + + // 5. Run the pipeline + pipeline.run().waitUntilFinish(); + + // 6. Validate the Iceberg Table was created with the correct spec and properties + Table table = catalog.loadTable(tableId); + tableProps.forEach((key, value) -> assertThat(table.properties(), hasEntry(key, value))); + assertEquals(spec, table.spec()); + + // Check that we have exactly 1 snapshot with 2 files + assertEquals(1, Iterables.size(table.snapshots())); + + List addedFiles = + Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io())); + assertEquals(2, addedFiles.size()); + + // Verify file paths + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data1.parquet"))); + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data2.parquet"))); + + // check metrics metadata is preserved + DataFile writtenDf1 = writer.toDataFile(); + DataFile writtenDf2 = writer2.toDataFile(); + DataFile addedDf1 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data1.parquet")) + .collect(Collectors.toList())); + DataFile addedDf2 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data2.parquet")) + .collect(Collectors.toList())); + + assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds()); + assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds()); + assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds()); + assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds()); + + // check partition metadata is preserved + assertEquals(expectedPartition1, addedDf1.partition()); + assertEquals(expectedPartition2, addedDf2.partition()); + } + + @Test + public void testStreamingAdds() throws IOException { + List paths = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + String file = String.format("%sdata_%s.parquet", root, i); + DataWriter writer = createWriter(file); + writer.write(record(1, "SomeName", 30)); + writer.close(); + paths.add(Row.withSchema(INPUT_SCHEMA).addValue(file).build()); + } + + PCollection files = + pipeline.apply( + TestStream.create(INPUT_SCHEMA) + .addElements( + paths.get(0), paths.subList(1, 15).toArray(new Row[] {})) // should commit twice + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements( + paths.get(15), + paths.subList(16, 40).toArray(new Row[] {})) // should commit 3 times + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements( + paths.get(40), + paths.subList(41, 45).toArray(new Row[] {})) // should commit once + .advanceWatermarkToInfinity()); + + PCollectionRowTuple.of("input", files) + .apply( + new AddFiles( + catalogConfig, + tableId.toString(), + null, + null, + null, + 10, // trigger at 10 files + Duration.standardSeconds(5))); + pipeline.run().waitUntilFinish(); + + Table table = catalog.loadTable(tableId); + + List snapshots = Lists.newArrayList(table.snapshots()); + snapshots.sort(Comparator.comparingLong(Snapshot::timestampMillis)); + + assertEquals(6, snapshots.size()); + assertEquals(10, Iterables.size(snapshots.get(0).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(1).addedDataFiles(table.io()))); + assertEquals(10, Iterables.size(snapshots.get(2).addedDataFiles(table.io()))); + assertEquals(10, Iterables.size(snapshots.get(3).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(4).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(5).addedDataFiles(table.io()))); + } + + @Test + public void testUnknownFormatErrors() throws Exception { + catalog.createTable(tableId, icebergSchema); + // Create a dummy text file (unsupported extension) + File txtFile = temp.newFile("unsupported.txt"); + txtFile.createNewFile(); + + Row badRow = Row.withSchema(INPUT_SCHEMA).addValue(txtFile.getAbsolutePath()).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(badRow).withRowSchema(INPUT_SCHEMA)); + + AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null, null, null, 1, null); + PCollectionRowTuple outputTuple = PCollectionRowTuple.of("input", inputFiles).apply(addFiles); + + // Validate the file ended up in the errors PCollection with the correct schema + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ERROR_SCHEMA) + .addValues(txtFile.getAbsolutePath(), "Could not determine the file's format") + .build()); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testPartitionPrefixErrors() throws Exception { + // Drop unpartitioned table and create a partitioned one + catalog.dropTable(tableId); + PartitionSpec spec = PartitionSpec.builderFor(icebergSchema).identity("name").build(); + catalog.createTable(tableId, icebergSchema, spec); + + String file1 = root + "data1.parquet"; + wrapper.wrap(record(-1, "And", 30)); + DataWriter writer = createWriter(file1, wrapper.copy()); + writer.write(record(1, "Andrew", 30)); + writer.close(); + + Row row1 = Row.withSchema(INPUT_SCHEMA).addValue(file1).build(); + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(row1).withRowSchema(INPUT_SCHEMA)); + + // Notice locationPrefix is "some/prefix/" but the absolute path doesn't start with it + AddFiles addFiles = + new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", null, null, 1, null); + PCollectionRowTuple outputTuple = PCollectionRowTuple.of("input", inputFiles).apply(addFiles); + + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ERROR_SCHEMA) + .addValues(file1, "File path did not start with the specified prefix") + .build()); + + pipeline.run().waitUntilFinish(); + } + + private DataWriter createWriter(String file) throws IOException { + return createWriter(file, null); + } + + private DataWriter createWriter(String file, @Nullable StructLike partition) + throws IOException { + return Parquet.writeData(Files.localOutput(file)) + .schema(icebergSchema) + .withSpec(partition != null ? spec : PartitionSpec.unpartitioned()) + .withPartition(partition) + .createWriterFunc(GenericParquetWriter::create) + .build(); + } + + private Record record(int id, String name, int age) { + return GenericRecord.create(icebergSchema).copy("id", id, "name", name, "age", age); + } +} diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 24c18f382817..e175f646d599 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -970,10 +970,10 @@ public Progress getProgress() { } } - private static class BeamParquetInputFile implements InputFile { + public static class BeamParquetInputFile implements InputFile { private final SeekableByteChannel seekableByteChannel; - BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { + public BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { this.seekableByteChannel = seekableByteChannel; }