From e34370a08976974cda3ab95cff294cc28bf867e1 Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Wed, 10 Jun 2026 13:08:22 -0700 Subject: [PATCH 1/2] Add Delta Lake source to the Java Managed API --- .../pipeline/v1/external_transforms.proto | 2 + sdks/java/io/delta/build.gradle | 1 + .../DeltaReadSchemaTransformProvider.java | 140 ++++++++++++++++++ .../apache/beam/sdk/io/delta/DeltaIOTest.java | 41 +++++ .../org/apache/beam/sdk/managed/Managed.java | 4 + 5 files changed, 188 insertions(+) create mode 100644 sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 043a72dd34f2..c73986eed48b 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -88,6 +88,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:sql_server_read:v1"]; SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:sql_server_write:v1"]; + DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:delta_lake_read:v1"]; } } diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle index c07aef6981b9..57b1cd8ad876 100644 --- a/sdks/java/io/delta/build.gradle +++ b/sdks/java/io/delta/build.gradle @@ -33,6 +33,7 @@ def parquet_version = "1.16.0" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.delta_kernel_api implementation library.java.delta_kernel_defaults diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java new file mode 100644 index 000000000000..f8d6b87592fa --- /dev/null +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records from Delta Lake and + * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link + * org.apache.beam.sdk.values.Row}s. + */ +@AutoService(SchemaTransformProvider.class) +public class DeltaReadSchemaTransformProvider extends TypedSchemaTransformProvider { + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Configuration configuration) { + return new DeltaReadSchemaTransform(configuration); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ); + } + + static class DeltaReadSchemaTransform extends SchemaTransform { + private final Configuration configuration; + + DeltaReadSchemaTransform(Configuration configuration) { + this.configuration = configuration; + } + + Row getConfigurationRow() { + try { + return SchemaRegistry.createDefault() + .getToRowFunction(Configuration.class) + .apply(configuration) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + DeltaIO.ReadRows read = DeltaIO.readRows().from(configuration.getTable()); + if (configuration.getVersion() != null) { + read = read.withVersion(configuration.getVersion()); + } + if (configuration.getTimestamp() != null) { + read = read.withTimestamp(configuration.getTimestamp()); + } + Map hadoopConfig = configuration.getHadoopConfig(); + if (hadoopConfig != null) { + read = read.withConfig(hadoopConfig); + } + + PCollection output = input.getPipeline().apply(read); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + static Builder builder() { + return new AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("Identifier of the Delta Lake table.") + abstract String getTable(); + + @SchemaFieldDescription("Version of the Delta Lake table to read.") + @Nullable + abstract Long getVersion(); + + @SchemaFieldDescription("Timestamp of the Delta Lake table to read.") + @Nullable + abstract String getTimestamp(); + + @SchemaFieldDescription("Properties passed to the Hadoop Configuration.") + @Nullable + abstract Map getHadoopConfig(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setTable(String table); + + abstract Builder setVersion(@Nullable Long version); + + abstract Builder setTimestamp(@Nullable String timestamp); + + abstract Builder setHadoopConfig(@Nullable Map hadoopConfig); + + abstract Configuration build(); + } + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java index ef6dd660c607..bd8bf8b3c8cc 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows; import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -52,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -387,6 +389,45 @@ private byte[] writeParquetFile(File file, Row row) throws Exception { return java.nio.file.Files.readAllBytes(file.toPath()); } + @Test + public void testManagedDeltaRead() throws Exception { + File tableDir = tempFolder.newFolder("managed-delta-table"); + + // 1. Write a Parquet file to simulate a Delta table + Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build(); + Row row = Row.withSchema(schema).addValues("test-name").build(); + writeParquetFile(new File(tableDir, "part-00000.parquet"), row); + + // 2. Create the Delta log + File logDir = new File(tableDir, "_delta_log"); + logDir.mkdirs(); + File commitFile = new File(logDir, "00000000000000000000.json"); + + File parquetFile = new File(tableDir, "part-00000.parquet"); + byte[] fileBytes = Files.readAllBytes(parquetFile.toPath()); + + String commitContent = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n" + + "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":" + + fileBytes.length + + ",\"modificationTime\":123456789,\"dataChange\":true}}"; + + Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); + + // 3. Read it using Managed + PCollection output = + readPipeline + .apply( + Managed.read(Managed.DELTA_LAKE) + .withConfig(ImmutableMap.of("table", tableDir.getAbsolutePath()))) + .getSinglePCollection(); + + PAssert.that(output).containsInAnyOrder(row); + + readPipeline.run().waitUntilFinish(); + } + @Test @org.junit.Ignore("Manual integration test with external local table") public void testReadingLocalTable() throws Exception { diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index a5e7d879b441..9589992e079a 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -93,6 +93,7 @@ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; + public static final String DELTA_LAKE = "delta"; public static final String ICEBERG_CDC = "iceberg_cdc"; public static final String KAFKA = "kafka"; public static final String BIGQUERY = "bigquery"; @@ -104,6 +105,7 @@ public class Managed { public static final Map READ_TRANSFORMS = ImmutableMap.builder() .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) + .put(DELTA_LAKE, getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ)) .put(ICEBERG_CDC, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) @@ -128,6 +130,8 @@ public class Managed { *