Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:sql_server_read:v1"];
SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:sql_server_write:v1"];
DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:delta_lake_read:v1"];
}
}

Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/delta/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def parquet_version = "1.16.0"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.delta_kernel_api
implementation library.java.delta_kernel_defaults

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.delta;

import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records from Delta Lake and
* outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link
* org.apache.beam.sdk.values.Row}s.
*/
@AutoService(SchemaTransformProvider.class)
public class DeltaReadSchemaTransformProvider extends TypedSchemaTransformProvider<Configuration> {
static final String OUTPUT_TAG = "output";

@Override
protected SchemaTransform from(Configuration configuration) {
return new DeltaReadSchemaTransform(configuration);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
}

@Override
public String identifier() {
return getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ);
}

static class DeltaReadSchemaTransform extends SchemaTransform {
private final Configuration configuration;

DeltaReadSchemaTransform(Configuration configuration) {
this.configuration =
java.util.Objects.requireNonNull(configuration, "configuration cannot be null");
}
Comment thread
chamikaramj marked this conversation as resolved.

Row getConfigurationRow() {
try {
return SchemaRegistry.createDefault()
.getToRowFunction(Configuration.class)
.apply(configuration)
.sorted()
.toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
DeltaIO.ReadRows read = DeltaIO.readRows().from(configuration.getTable());
if (configuration.getVersion() != null) {
read = read.withVersion(configuration.getVersion());
}
if (configuration.getTimestamp() != null) {
read = read.withTimestamp(configuration.getTimestamp());
}
Map<String, String> hadoopConfig = configuration.getHadoopConfig();
if (hadoopConfig != null) {
read = read.withConfig(hadoopConfig);
}

PCollection<Row> output = input.getPipeline().apply(read);

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration {
static Builder builder() {
return new AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder();
}

@SchemaFieldDescription("Identifier of the Delta Lake table.")
abstract String getTable();

@SchemaFieldDescription("Version of the Delta Lake table to read.")
@Nullable
abstract Long getVersion();

@SchemaFieldDescription("Timestamp of the Delta Lake table to read.")
@Nullable
abstract String getTimestamp();

@SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
@Nullable
abstract Map<String, String> getHadoopConfig();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setTable(String table);

abstract Builder setVersion(@Nullable Long version);

abstract Builder setTimestamp(@Nullable String timestamp);

abstract Builder setHadoopConfig(@Nullable Map<String, String> hadoopConfig);

abstract Configuration build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.managed.Managed;
Comment thread
chamikaramj marked this conversation as resolved.
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand All @@ -52,6 +53,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -387,6 +389,45 @@ private byte[] writeParquetFile(File file, Row row) throws Exception {
return java.nio.file.Files.readAllBytes(file.toPath());
}

@Test
public void testManagedDeltaRead() throws Exception {
File tableDir = tempFolder.newFolder("managed-delta-table");

// 1. Write a Parquet file to simulate a Delta table
Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build();
Row row = Row.withSchema(schema).addValues("test-name").build();
writeParquetFile(new File(tableDir, "part-00000.parquet"), row);

// 2. Create the Delta log
File logDir = new File(tableDir, "_delta_log");
logDir.mkdirs();
File commitFile = new File(logDir, "00000000000000000000.json");

File parquetFile = new File(tableDir, "part-00000.parquet");
byte[] fileBytes = Files.readAllBytes(parquetFile.toPath());

String commitContent =
"{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n"
+ "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n"
+ "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":"
+ fileBytes.length
+ ",\"modificationTime\":123456789,\"dataChange\":true}}";

Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8));

// 3. Read it using Managed
PCollection<Row> output =
readPipeline
.apply(
Managed.read(Managed.DELTA_LAKE)
.withConfig(ImmutableMap.of("table", tableDir.getAbsolutePath())))
.getSinglePCollection();

PAssert.that(output).containsInAnyOrder(row);

readPipeline.run().waitUntilFinish();
}

@Test
@org.junit.Ignore("Manual integration test with external local table")
public void testReadingLocalTable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class Managed {

// TODO: Dynamically generate a list of supported transforms
public static final String ICEBERG = "iceberg";
public static final String DELTA_LAKE = "delta";
public static final String ICEBERG_CDC = "iceberg_cdc";
public static final String KAFKA = "kafka";
public static final String BIGQUERY = "bigquery";
Expand All @@ -104,6 +105,7 @@ public class Managed {
public static final Map<String, String> READ_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
.put(DELTA_LAKE, getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ))
.put(ICEBERG_CDC, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
.put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
Expand All @@ -128,6 +130,8 @@ public class Managed {
* <ul>
* <li>{@link Managed#ICEBERG} : Read from Apache Iceberg tables using <a
* href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/iceberg/IcebergIO.html">IcebergIO</a>
* <li>{@link Managed#DELTA_LAKE} : Read from Delta Lake tables using <a
* href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/delta/DeltaIO.html">DeltaIO</a>
Comment thread
derrickaw marked this conversation as resolved.
* <li>{@link Managed#ICEBERG_CDC} : CDC Read from Apache Iceberg tables using <a
* href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/iceberg/IcebergIO.html">IcebergIO</a>
* <li>{@link Managed#KAFKA} : Read from Apache Kafka topics using <a
Expand Down
Loading