diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 043a72dd34f2..c73986eed48b 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -88,6 +88,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:sql_server_read:v1"]; SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:sql_server_write:v1"]; + DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:delta_lake_read:v1"]; } } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java index d060d5916e9f..ca97ed287a0a 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java @@ -88,6 +88,7 @@ public List outputCollectionNames() { @DefaultSchema(AutoValueSchema.class) @AutoValue + @SuppressWarnings("mutable") public abstract static class Configuration { @SuppressWarnings({"AutoValueMutable", "mutable"}) diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle index c07aef6981b9..57b1cd8ad876 100644 --- a/sdks/java/io/delta/build.gradle +++ b/sdks/java/io/delta/build.gradle @@ -33,6 +33,7 @@ def parquet_version = "1.16.0" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.delta_kernel_api implementation library.java.delta_kernel_defaults diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java new file mode 100644 index 000000000000..42ca3f24def9 --- /dev/null +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records from Delta Lake and + * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link + * org.apache.beam.sdk.values.Row}s. + */ +@AutoService(SchemaTransformProvider.class) +public class DeltaReadSchemaTransformProvider extends TypedSchemaTransformProvider { + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Configuration configuration) { + return new DeltaReadSchemaTransform(configuration); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ); + } + + static class DeltaReadSchemaTransform extends SchemaTransform { + private final Configuration configuration; + + DeltaReadSchemaTransform(Configuration configuration) { + this.configuration = + java.util.Objects.requireNonNull(configuration, "configuration cannot be null"); + } + + Row getConfigurationRow() { + try { + return SchemaRegistry.createDefault() + .getToRowFunction(Configuration.class) + .apply(configuration) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + DeltaIO.ReadRows read = DeltaIO.readRows().from(configuration.getTable()); + if (configuration.getVersion() != null) { + read = read.withVersion(configuration.getVersion()); + } + if (configuration.getTimestamp() != null) { + read = read.withTimestamp(configuration.getTimestamp()); + } + Map hadoopConfig = configuration.getHadoopConfig(); + if (hadoopConfig != null) { + read = read.withConfig(hadoopConfig); + } + + PCollection output = input.getPipeline().apply(read); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + static Builder builder() { + return new AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("Identifier of the Delta Lake table.") + abstract String getTable(); + + @SchemaFieldDescription("Version of the Delta Lake table to read.") + @Nullable + abstract Long getVersion(); + + @SchemaFieldDescription("Timestamp of the Delta Lake table to read.") + @Nullable + abstract String getTimestamp(); + + @SchemaFieldDescription("Properties passed to the Hadoop Configuration.") + @Nullable + abstract Map getHadoopConfig(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setTable(String table); + + abstract Builder setVersion(@Nullable Long version); + + abstract Builder setTimestamp(@Nullable String timestamp); + + abstract Builder setHadoopConfig(@Nullable Map hadoopConfig); + + abstract Configuration build(); + } + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java index ef6dd660c607..bd8bf8b3c8cc 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows; import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -52,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -387,6 +389,45 @@ private byte[] writeParquetFile(File file, Row row) throws Exception { return java.nio.file.Files.readAllBytes(file.toPath()); } + @Test + public void testManagedDeltaRead() throws Exception { + File tableDir = tempFolder.newFolder("managed-delta-table"); + + // 1. Write a Parquet file to simulate a Delta table + Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build(); + Row row = Row.withSchema(schema).addValues("test-name").build(); + writeParquetFile(new File(tableDir, "part-00000.parquet"), row); + + // 2. Create the Delta log + File logDir = new File(tableDir, "_delta_log"); + logDir.mkdirs(); + File commitFile = new File(logDir, "00000000000000000000.json"); + + File parquetFile = new File(tableDir, "part-00000.parquet"); + byte[] fileBytes = Files.readAllBytes(parquetFile.toPath()); + + String commitContent = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n" + + "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":" + + fileBytes.length + + ",\"modificationTime\":123456789,\"dataChange\":true}}"; + + Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); + + // 3. Read it using Managed + PCollection output = + readPipeline + .apply( + Managed.read(Managed.DELTA_LAKE) + .withConfig(ImmutableMap.of("table", tableDir.getAbsolutePath()))) + .getSinglePCollection(); + + PAssert.that(output).containsInAnyOrder(row); + + readPipeline.run().waitUntilFinish(); + } + @Test @org.junit.Ignore("Manual integration test with external local table") public void testReadingLocalTable() throws Exception { diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java new file mode 100644 index 000000000000..77aef7bce494 --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.OUTPUT_TAG; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DeltaReadSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class DeltaReadSchemaTransformProviderTest { + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testBuildTransformWithRow() { + java.util.Map hadoopConfig = new java.util.HashMap<>(); + hadoopConfig.put("fs.gs.project.id", "test-project"); + + Row config = + Row.withSchema(new DeltaReadSchemaTransformProvider().configurationSchema()) + .withFieldValue("table", "/path/to/table") + .withFieldValue("version", 5L) + .withFieldValue("timestamp", "2026-06-04T12:00:00Z") + .withFieldValue("hadoop_config", hadoopConfig) + .build(); + + new DeltaReadSchemaTransformProvider().from(config); + } + + @Test + public void testSimpleScan() throws Exception { + File tableDir = tempFolder.newFolder("delta-table-simple"); + + // 1. Write a Parquet file using Beam + Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build(); + Row row = Row.withSchema(schema).addValues("test-name").build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema); + GenericRecord record = AvroUtils.toGenericRecord(row, avroSchema); + + writePipeline + .apply("Create Input", Create.of(record).withCoder(AvroCoder.of(avroSchema))) + .apply( + "Write Parquet", + FileIO.write() + .via(ParquetIO.sink(avroSchema)) + .to(tableDir.getAbsolutePath() + "/") + .withNaming( + (BoundedWindow window, + PaneInfo paneInfo, + int numShards, + int shardIndex, + Compression compression) -> "part-00000.parquet")); + + writePipeline.run().waitUntilFinish(); + + File parquetFile = new File(tableDir, "part-00000.parquet"); + byte[] fileBytes = Files.readAllBytes(parquetFile.toPath()); + + // 2. Create the Delta log + File logDir = new File(tableDir, "_delta_log"); + logDir.mkdirs(); + File commitFile = new File(logDir, "00000000000000000000.json"); + + String commitContent = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n" + + "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":" + + fileBytes.length + + ",\"modificationTime\":123456789,\"dataChange\":true}}"; + + Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); + + // 3. Read it using DeltaReadSchemaTransformProvider + Configuration readConfig = Configuration.builder().setTable(tableDir.getAbsolutePath()).build(); + + PCollection output = + PCollectionRowTuple.empty(readPipeline) + .apply(new DeltaReadSchemaTransformProvider().from(readConfig)) + .get(OUTPUT_TAG); + + PAssert.that(output).containsInAnyOrder(row); + + readPipeline.run().waitUntilFinish(); + } +} diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 32894b978094..c1db7db0fc51 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -25,8 +25,8 @@ applyJavaNature( exportJavadoc: false, validateShadowJar: false, shadowClosure: {}, - // iceberg requires Java11+ - requireJavaVersion: JavaVersion.VERSION_11 + // delta and iceberg require Java17+ / Java11+ + requireJavaVersion: JavaVersion.VERSION_17 ) // We don't want to use the latest version for the entire beam sdk since beam Java users can override it themselves. @@ -97,6 +97,7 @@ dependencies { runtimeOnly library.java.bigdataoss_util_hadoop runtimeOnly project(":sdks:java:io:mongodb") + runtimeOnly project(":sdks:java:io:delta") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index a5e7d879b441..9589992e079a 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -93,6 +93,7 @@ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; + public static final String DELTA_LAKE = "delta"; public static final String ICEBERG_CDC = "iceberg_cdc"; public static final String KAFKA = "kafka"; public static final String BIGQUERY = "bigquery"; @@ -104,6 +105,7 @@ public class Managed { public static final Map READ_TRANSFORMS = ImmutableMap.builder() .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) + .put(DELTA_LAKE, getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ)) .put(ICEBERG_CDC, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) @@ -128,6 +130,8 @@ public class Managed { *
    *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg tables using IcebergIO + *
  • {@link Managed#DELTA_LAKE} : Read from Delta Lake tables using DeltaIO *
  • {@link Managed#ICEBERG_CDC} : CDC Read from Apache Iceberg tables using IcebergIO *
  • {@link Managed#KAFKA} : Read from Apache Kafka topics using