From a34dfbf7b69975f737883e36c18952c593081730 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 4 Jun 2026 15:03:21 +0000 Subject: [PATCH 1/7] make changes to support delta --- sdks/java/io/expansion-service/build.gradle | 5 ++-- .../apache_beam/yaml/integration_tests.py | 23 +++++++++++++++++ sdks/python/apache_beam/yaml/standard_io.yaml | 19 ++++++++++++++ sdks/standard_external_transforms.yaml | 25 ++++++++++++++++++- 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 32894b978094..c1db7db0fc51 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -25,8 +25,8 @@ applyJavaNature( exportJavadoc: false, validateShadowJar: false, shadowClosure: {}, - // iceberg requires Java11+ - requireJavaVersion: JavaVersion.VERSION_11 + // delta and iceberg require Java17+ / Java11+ + requireJavaVersion: JavaVersion.VERSION_17 ) // We don't want to use the latest version for the entire beam sdk since beam Java users can override it themselves. @@ -97,6 +97,7 @@ dependencies { runtimeOnly library.java.bigdataoss_util_hadoop runtimeOnly project(":sdks:java:io:mongodb") + runtimeOnly project(":sdks:java:io:delta") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 150c0ca86254..fcf5a9f996af 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -20,6 +20,7 @@ import contextlib import copy import glob +import tempfile import itertools import logging import os @@ -36,6 +37,8 @@ import psycopg2 import pytds import sqlalchemy +import pyarrow as pa +import pyarrow.parquet as pq import yaml from apitools.base.py.exceptions import HttpError from google.cloud import pubsub_v1 @@ -584,6 +587,26 @@ def temp_pubsub_emulator(project_id="apache-beam-testing"): yield created_topic_object.name +@contextlib.contextmanager +def temp_delta_table(): + with tempfile.TemporaryDirectory() as temp_dir: + log_dir = os.path.join(temp_dir, "_delta_log") + os.makedirs(log_dir, exist_ok=True) + table_data = pa.table({"name": ["a", "b", "c"]}) + parquet_path = os.path.join(temp_dir, "part-00000.parquet") + pq.write_table(table_data, parquet_path) + file_size = os.path.getsize(parquet_path) + commit_content = ( + '{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}\n' + '{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n' + f'{{"add":{{"path":"part-00000.parquet","partitionValues":{{}},"size":{file_size},"modificationTime":123456789,"dataChange":true}}}}\n' + ) + commit_file = os.path.join(log_dir, "00000000000000000000.json") + with open(commit_file, "w") as f: + f.write(commit_content) + yield temp_dir + + def replace_recursive(spec, vars): """Recursively replaces string placeholders in a spec with values from vars. diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 1790dc0f5a9a..83b0d8b51339 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -449,3 +449,22 @@ 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1' config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' + +#Delta Lake +- type: renaming + transforms: + 'ReadFromDelta': 'ReadFromDelta' + config: + mappings: + 'ReadFromDelta': + table_path: 'table_path' + version: 'version' + timestamp: 'timestamp' + hadoop_config: 'hadoop_config' + underlying_provider: + type: beamJar + transforms: + 'ReadFromDelta': 'beam:schematransform:org.apache.beam:delta_read:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 057c4e3f47d1..d53f12fb465b 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,8 +19,31 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2026-05-06 +# Last updated on: 2026-06-04 +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + - description: Hadoop configuration properties. + name: hadoop_config + nullable: true + type: map[str, str] + - description: Path of the Delta Lake table. + name: table_path + nullable: false + type: str + - description: Timestamp of the Delta Lake table to read. + name: timestamp + nullable: true + type: str + - description: Version of the Delta Lake table to read. + name: version + nullable: true + type: int64 + identifier: beam:schematransform:org.apache.beam:delta_read:v1 + name: DeltaRead - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 number called "value". The count is produced from the given "start" value and From d7888eaf97a014caa796749bc297ed09761f7511 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 4 Jun 2026 15:03:52 +0000 Subject: [PATCH 2/7] add transformprovider and yaml file --- .../DeltaReadSchemaTransformProvider.java | 123 +++++++++++++++++ .../DeltaReadSchemaTransformProviderTest.java | 128 ++++++++++++++++++ sdks/python/apache_beam/yaml/tests/delta.yaml | 34 +++++ 3 files changed, 285 insertions(+) create mode 100644 sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java create mode 100644 sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java create mode 100644 sdks/python/apache_beam/yaml/tests/delta.yaml diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java new file mode 100644 index 000000000000..b047deb95a4b --- /dev/null +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records from Delta Lake and + * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link Row}s. + */ +@AutoService(SchemaTransformProvider.class) +public class DeltaReadSchemaTransformProvider extends TypedSchemaTransformProvider { + static final String OUTPUT_TAG = "output"; + static final String IDENTIFIER = "beam:schematransform:org.apache.beam:delta_read:v1"; + + @Override + protected SchemaTransform from(Configuration configuration) { + return new DeltaReadSchemaTransform(configuration); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + static class DeltaReadSchemaTransform extends SchemaTransform { + private final Configuration configuration; + + DeltaReadSchemaTransform(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + DeltaIO.ReadRows readRows = DeltaIO.readRows().from(configuration.getTablePath()); + if (configuration.getVersion() != null) { + readRows = readRows.withVersion(configuration.getVersion()); + } + if (configuration.getTimestamp() != null) { + readRows = readRows.withTimestamp(configuration.getTimestamp()); + } + Map hadoopConfig = configuration.getHadoopConfig(); + if (hadoopConfig != null) { + readRows = readRows.withConfig(hadoopConfig); + } + + PCollection output = input.getPipeline().apply(readRows); + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + static Builder builder() { + return new AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("Path of the Delta Lake table.") + abstract String getTablePath(); + + @SchemaFieldDescription("Version of the Delta Lake table to read.") + @Nullable + abstract Long getVersion(); + + @SchemaFieldDescription("Timestamp of the Delta Lake table to read.") + @Nullable + abstract String getTimestamp(); + + @SchemaFieldDescription("Hadoop configuration properties.") + @Nullable + abstract Map getHadoopConfig(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setTablePath(String tablePath); + + abstract Builder setVersion(@Nullable Long version); + + abstract Builder setTimestamp(@Nullable String timestamp); + + abstract Builder setHadoopConfig(@Nullable Map hadoopConfig); + + abstract Configuration build(); + } + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java new file mode 100644 index 000000000000..6a513cb33edf --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.OUTPUT_TAG; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DeltaReadSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class DeltaReadSchemaTransformProviderTest { + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testBuildTransformWithRow() { + java.util.Map hadoopConfig = new java.util.HashMap<>(); + hadoopConfig.put("fs.gs.project.id", "test-project"); + + Row config = + Row.withSchema(new DeltaReadSchemaTransformProvider().configurationSchema()) + .withFieldValue("table_path", "/path/to/table") + .withFieldValue("version", 5L) + .withFieldValue("timestamp", "2026-06-04T12:00:00Z") + .withFieldValue("hadoop_config", hadoopConfig) + .build(); + + new DeltaReadSchemaTransformProvider().from(config); + } + + @Test + public void testSimpleScan() throws Exception { + File tableDir = tempFolder.newFolder("delta-table-simple"); + + // 1. Write a Parquet file using Beam + Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build(); + Row row = Row.withSchema(schema).addValues("test-name").build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema); + GenericRecord record = AvroUtils.toGenericRecord(row, avroSchema); + + writePipeline + .apply("Create Input", Create.of(record).withCoder(AvroCoder.of(avroSchema))) + .apply( + "Write Parquet", + FileIO.write() + .via(ParquetIO.sink(avroSchema)) + .to(tableDir.getAbsolutePath() + "/") + .withNaming( + (BoundedWindow window, + PaneInfo paneInfo, + int numShards, + int shardIndex, + Compression compression) -> "part-00000.parquet")); + + writePipeline.run().waitUntilFinish(); + + File parquetFile = new File(tableDir, "part-00000.parquet"); + byte[] fileBytes = Files.readAllBytes(parquetFile.toPath()); + + // 2. Create the Delta log + File logDir = new File(tableDir, "_delta_log"); + logDir.mkdirs(); + File commitFile = new File(logDir, "00000000000000000000.json"); + + String commitContent = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n" + + "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":" + + fileBytes.length + + ",\"modificationTime\":123456789,\"dataChange\":true}}"; + + Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); + + // 3. Read it using DeltaReadSchemaTransformProvider + Configuration readConfig = + Configuration.builder().setTablePath(tableDir.getAbsolutePath()).build(); + + PCollection output = + PCollectionRowTuple.empty(readPipeline) + .apply(new DeltaReadSchemaTransformProvider().from(readConfig)) + .get(OUTPUT_TAG); + + PAssert.that(output).containsInAnyOrder(row); + + readPipeline.run().waitUntilFinish(); + } +} diff --git a/sdks/python/apache_beam/yaml/tests/delta.yaml b/sdks/python/apache_beam/yaml/tests/delta.yaml new file mode 100644 index 000000000000..a38e0750d633 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/delta.yaml @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: DELTA_TABLE + type: "apache_beam.yaml.integration_tests.temp_delta_table" + +pipelines: + - pipeline: + type: chain + transforms: + - type: ReadFromDelta + config: + table_path: "{DELTA_TABLE}" + - type: AssertEqual + config: + elements: + - {name: "a"} + - {name: "b"} + - {name: "c"} From 7bce23329c5c8ba470fd9e2f9170f1b03cf60f00 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 4 Jun 2026 15:32:20 +0000 Subject: [PATCH 3/7] add managed --- .../pipeline/v1/external_transforms.proto | 2 ++ .../python/apache_beam/transforms/external.py | 1 + sdks/python/apache_beam/transforms/managed.py | 4 +++- sdks/python/apache_beam/yaml/standard_io.yaml | 19 +-------------- sdks/python/apache_beam/yaml/yaml_io.py | 23 +++++++++++++++++++ sdks/standard_expansion_services.yaml | 1 + sdks/standard_external_transforms.yaml | 23 ------------------- 7 files changed, 31 insertions(+), 42 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 043a72dd34f2..7bce9494c88f 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -88,6 +88,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:sql_server_read:v1"]; SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:sql_server_write:v1"]; + DELTA_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:delta_read:v1"]; } } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 9469ac717dfc..5bd22c44d485 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -86,6 +86,7 @@ ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.SQL_SERVER_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.DELTA_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 3f1342229ae8..13bdf986ced9 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -88,8 +88,9 @@ POSTGRES = "postgres" MYSQL = "mysql" SQL_SERVER = "sqlserver" +DELTA = "delta" -__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] +__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "DELTA", "Read", "Write"] class Read(PTransform): @@ -102,6 +103,7 @@ class Read(PTransform): POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn, SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_READ.urn, + DELTA: ManagedTransforms.Urns.DELTA_READ.urn, } def __init__( diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 83b0d8b51339..5d1fa751a8cc 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -116,6 +116,7 @@ 'ReadFromTFRecord': 'apache_beam.yaml.yaml_io.read_from_tfrecord' 'WriteToTFRecord': 'apache_beam.yaml.yaml_io.write_to_tfrecord' 'WriteToMongoDB': 'apache_beam.yaml.yaml_io.write_to_mongodb' + 'ReadFromDelta': 'apache_beam.yaml.yaml_io.read_from_delta' # General File Formats @@ -450,21 +451,3 @@ config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' -#Delta Lake -- type: renaming - transforms: - 'ReadFromDelta': 'ReadFromDelta' - config: - mappings: - 'ReadFromDelta': - table_path: 'table_path' - version: 'version' - timestamp: 'timestamp' - hadoop_config: 'hadoop_config' - underlying_provider: - type: beamJar - transforms: - 'ReadFromDelta': 'beam:schematransform:org.apache.beam:delta_read:v1' - config: - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 989661a6eae4..dc8993859692 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -550,6 +550,29 @@ def read_from_iceberg( config_properties=config_properties)) +def read_from_delta( + table_path: str, + version: Optional[int] = None, + timestamp: Optional[str] = None, + hadoop_config: Optional[Mapping[str, str]] = None, +): + """Reads a Delta Lake table. + + Args: + table_path: Path of the Delta Lake table. + version: Version of the Delta Lake table to read. + timestamp: Timestamp of the Delta Lake table to read. + hadoop_config: Hadoop configuration properties. + """ + return beam.managed.Read( + "delta", + config=dict( + table_path=table_path, + version=version, + timestamp=timestamp, + hadoop_config=hadoop_config)) + + def write_to_iceberg( table: str, catalog_name: Optional[str] = None, diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index 531caca5a376..43ff0f7ebf89 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -52,6 +52,7 @@ - 'beam:schematransform:org.apache.beam:iceberg_write:v1' - 'beam:schematransform:org.apache.beam:iceberg_read:v1' - 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' + - 'beam:schematransform:org.apache.beam:delta_read:v1' # TODO(ahmedabu98): Enable this service in a future PR #- gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index d53f12fb465b..5976f04e1556 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -21,29 +21,6 @@ # # Last updated on: 2026-06-04 -- default_service: sdks:java:io:expansion-service:shadowJar - description: '' - destinations: - python: apache_beam/io - fields: - - description: Hadoop configuration properties. - name: hadoop_config - nullable: true - type: map[str, str] - - description: Path of the Delta Lake table. - name: table_path - nullable: false - type: str - - description: Timestamp of the Delta Lake table to read. - name: timestamp - nullable: true - type: str - - description: Version of the Delta Lake table to read. - name: version - nullable: true - type: int64 - identifier: beam:schematransform:org.apache.beam:delta_read:v1 - name: DeltaRead - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 number called "value". The count is produced from the given "start" value and From 8dc2a4f7e402ec36cd6603fc85bdf87c06022239 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 10 Jun 2026 14:28:03 +0000 Subject: [PATCH 4/7] fix lint --- sdks/python/apache_beam/yaml/integration_tests.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index fcf5a9f996af..8477988fadc2 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -20,7 +20,6 @@ import contextlib import copy import glob -import tempfile import itertools import logging import os @@ -28,6 +27,7 @@ import secrets import sqlite3 import string +import tempfile import unittest import uuid from datetime import datetime @@ -35,10 +35,10 @@ import mock import psycopg2 -import pytds -import sqlalchemy import pyarrow as pa import pyarrow.parquet as pq +import pytds +import sqlalchemy import yaml from apitools.base.py.exceptions import HttpError from google.cloud import pubsub_v1 From e34370a08976974cda3ab95cff294cc28bf867e1 Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Wed, 10 Jun 2026 13:08:22 -0700 Subject: [PATCH 5/7] Add Delta Lake source to the Java Managed API --- .../pipeline/v1/external_transforms.proto | 2 + sdks/java/io/delta/build.gradle | 1 + .../DeltaReadSchemaTransformProvider.java | 140 ++++++++++++++++++ .../apache/beam/sdk/io/delta/DeltaIOTest.java | 41 +++++ .../org/apache/beam/sdk/managed/Managed.java | 4 + 5 files changed, 188 insertions(+) create mode 100644 sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 043a72dd34f2..c73986eed48b 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -88,6 +88,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:sql_server_read:v1"]; SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:sql_server_write:v1"]; + DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:delta_lake_read:v1"]; } } diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle index c07aef6981b9..57b1cd8ad876 100644 --- a/sdks/java/io/delta/build.gradle +++ b/sdks/java/io/delta/build.gradle @@ -33,6 +33,7 @@ def parquet_version = "1.16.0" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.delta_kernel_api implementation library.java.delta_kernel_defaults diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java new file mode 100644 index 000000000000..f8d6b87592fa --- /dev/null +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records from Delta Lake and + * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link + * org.apache.beam.sdk.values.Row}s. + */ +@AutoService(SchemaTransformProvider.class) +public class DeltaReadSchemaTransformProvider extends TypedSchemaTransformProvider { + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Configuration configuration) { + return new DeltaReadSchemaTransform(configuration); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ); + } + + static class DeltaReadSchemaTransform extends SchemaTransform { + private final Configuration configuration; + + DeltaReadSchemaTransform(Configuration configuration) { + this.configuration = configuration; + } + + Row getConfigurationRow() { + try { + return SchemaRegistry.createDefault() + .getToRowFunction(Configuration.class) + .apply(configuration) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + DeltaIO.ReadRows read = DeltaIO.readRows().from(configuration.getTable()); + if (configuration.getVersion() != null) { + read = read.withVersion(configuration.getVersion()); + } + if (configuration.getTimestamp() != null) { + read = read.withTimestamp(configuration.getTimestamp()); + } + Map hadoopConfig = configuration.getHadoopConfig(); + if (hadoopConfig != null) { + read = read.withConfig(hadoopConfig); + } + + PCollection output = input.getPipeline().apply(read); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + static Builder builder() { + return new AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("Identifier of the Delta Lake table.") + abstract String getTable(); + + @SchemaFieldDescription("Version of the Delta Lake table to read.") + @Nullable + abstract Long getVersion(); + + @SchemaFieldDescription("Timestamp of the Delta Lake table to read.") + @Nullable + abstract String getTimestamp(); + + @SchemaFieldDescription("Properties passed to the Hadoop Configuration.") + @Nullable + abstract Map getHadoopConfig(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setTable(String table); + + abstract Builder setVersion(@Nullable Long version); + + abstract Builder setTimestamp(@Nullable String timestamp); + + abstract Builder setHadoopConfig(@Nullable Map hadoopConfig); + + abstract Configuration build(); + } + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java index ef6dd660c607..bd8bf8b3c8cc 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows; import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -52,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -387,6 +389,45 @@ private byte[] writeParquetFile(File file, Row row) throws Exception { return java.nio.file.Files.readAllBytes(file.toPath()); } + @Test + public void testManagedDeltaRead() throws Exception { + File tableDir = tempFolder.newFolder("managed-delta-table"); + + // 1. Write a Parquet file to simulate a Delta table + Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build(); + Row row = Row.withSchema(schema).addValues("test-name").build(); + writeParquetFile(new File(tableDir, "part-00000.parquet"), row); + + // 2. Create the Delta log + File logDir = new File(tableDir, "_delta_log"); + logDir.mkdirs(); + File commitFile = new File(logDir, "00000000000000000000.json"); + + File parquetFile = new File(tableDir, "part-00000.parquet"); + byte[] fileBytes = Files.readAllBytes(parquetFile.toPath()); + + String commitContent = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n" + + "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":" + + fileBytes.length + + ",\"modificationTime\":123456789,\"dataChange\":true}}"; + + Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); + + // 3. Read it using Managed + PCollection output = + readPipeline + .apply( + Managed.read(Managed.DELTA_LAKE) + .withConfig(ImmutableMap.of("table", tableDir.getAbsolutePath()))) + .getSinglePCollection(); + + PAssert.that(output).containsInAnyOrder(row); + + readPipeline.run().waitUntilFinish(); + } + @Test @org.junit.Ignore("Manual integration test with external local table") public void testReadingLocalTable() throws Exception { diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index a5e7d879b441..9589992e079a 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -93,6 +93,7 @@ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; + public static final String DELTA_LAKE = "delta"; public static final String ICEBERG_CDC = "iceberg_cdc"; public static final String KAFKA = "kafka"; public static final String BIGQUERY = "bigquery"; @@ -104,6 +105,7 @@ public class Managed { public static final Map READ_TRANSFORMS = ImmutableMap.builder() .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) + .put(DELTA_LAKE, getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ)) .put(ICEBERG_CDC, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) @@ -128,6 +130,8 @@ public class Managed { *