diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java index 0caf10d55d..ab2b7b59af 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java @@ -169,15 +169,25 @@ private int findIndex(int[] array, int target) { public CloseableIterator pollBatch(Duration timeout) throws IOException { if (logScanFinished) { if (lakeRecordIterators.isEmpty()) { + List recordReaders = new ArrayList<>(); if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { - lakeRecordIterators = Collections.emptyList(); + // pass null split to get rowComparator + recordReaders.add(lakeSource.createRecordReader(() -> null)); } else { for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { - lakeRecordIterators.add( - lakeSource.createRecordReader(() -> lakeSplit).read()); + recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit)); } } + for (RecordReader reader : recordReaders) { + if (reader instanceof SortedRecordReader) { + rowComparator = ((SortedRecordReader) reader).order(); + } else { + throw new UnsupportedOperationException( + "lake records must instance of sorted view."); + } + lakeRecordIterators.add(reader.read()); + } } if (currentSortMergeReader == null) { currentSortMergeReader = diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java index d00a74a26b..64b3a6f34c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java @@ -22,6 +22,8 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.RowType; @@ -54,10 +56,16 @@ * } * * @param The type of records produced by this source + * @see FlussSourceBuilder */ public class FlussSource extends FlinkSource { private static final long serialVersionUID = 1L; + /** + * Creates a FlussSource without lake source (backward compatible constructor). + * + *

Use {@link #builder()} to create a FlussSource with more configuration options. + */ FlussSource( Configuration flussConf, TablePath tablePath, @@ -82,9 +90,41 @@ public class FlussSource extends FlinkSource { deserializationSchema, streaming, null, + null, LeaseContext.DEFAULT); } + /** Creates a FlussSource with optional lake source for union read. */ + FlussSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + FlussDeserializationSchema deserializationSchema, + boolean streaming, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext) { + // TODO: Support partition pushDown in datastream + super( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + deserializationSchema, + streaming, + null, + lakeSource, + leaseContext); + } + /** * Get a FlussSourceBuilder to build a {@link FlussSource}. * diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java index afd955c01f..5e549cc923 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java @@ -21,10 +21,14 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.initializer.OffsetsInitializer; +import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; +import org.apache.fluss.flink.source.reader.LeaseContext; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.RowType; @@ -38,6 +42,7 @@ import java.util.concurrent.ExecutionException; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; /** * Builder class for creating {@link FlussSource} instances. @@ -60,6 +65,7 @@ * } * * @param The type of records produced by the source being built + * @see FlussSource */ public class FlussSourceBuilder { private static final Logger LOG = LoggerFactory.getLogger(FlussSourceBuilder.class); @@ -77,6 +83,12 @@ public class FlussSourceBuilder { private String database; private String tableName; + // whether to enable lake source for union read + private boolean lakeEnabled = false; + + // whether this is a streaming source (default: true for unbounded streaming mode) + private boolean streaming = true; + /** * Sets the bootstrap servers for the Fluss source connection. * @@ -116,6 +128,36 @@ public FlussSourceBuilder setTable(String table) { return this; } + /** + * Enables or disables lake source for union read. + * + *

When enabled and the table has data lake configured (e.g., Paimon/Iceberg), the source + * will read historical data from the lake and real-time data from Fluss. Union read only takes + * effect when the starting offset is {@link OffsetsInitializer#full()}. + * + * @param lakeEnabled true to enable lake source for union read, false to disable + * @return this builder + */ + public FlussSourceBuilder setLakeEnabled(boolean lakeEnabled) { + this.lakeEnabled = lakeEnabled; + return this; + } + + /** + * Sets whether this source should run in streaming mode or batch mode. + * + *

In streaming mode (default), the source is unbounded. In batch mode, the source is bounded + * and terminates after reading all available data. For Flink BATCH execution mode with lake + * enabled, set this to {@code false}. + * + * @param streaming true for streaming (unbounded) mode, false for batch (bounded) mode + * @return this builder + */ + public FlussSourceBuilder setStreaming(boolean streaming) { + this.streaming = streaming; + return this; + } + /** * Sets the scan partition discovery interval in milliseconds. * @@ -288,6 +330,41 @@ public FlussSource build() { ? tableInfo.getRowType().project(projectedFields) : tableInfo.getRowType(); + // create lake source if lake is enabled and table has data lake configured + LakeSource lakeSource = null; + if (lakeEnabled) { + // check if table has data lake format configured + Map tableOptions = new HashMap<>(); + tableOptions.putAll(tableInfo.getCustomProperties().toMap()); + tableOptions.putAll(tableInfo.getProperties().toMap()); + + org.apache.fluss.metadata.DataLakeFormat dataLakeFormat = + flussConf.get(ConfigOptions.TABLE_DATALAKE_FORMAT); + if (dataLakeFormat != null) { + // only enable union read when startup mode is FULL + // OffsetsInitializer.full() is the default and enables union read + if (offsetsInitializer instanceof SnapshotOffsetsInitializer) { + try { + lakeSource = createLakeSource(tablePath, tableOptions); + LOG.info("Lake source created for union read on table: {}", tablePath); + } catch (Exception e) { + LOG.warn( + "Failed to create lake source for table {}, union read will be disabled: {}", + tablePath, + e.getMessage()); + } + } else { + LOG.info( + "Lake is enabled but startup mode is not FULL, union read is disabled. " + + "Use OffsetsInitializer.full() to enable union read."); + } + } else { + LOG.info( + "Lake is enabled but table {} does not have data lake configured, union read is disabled.", + tablePath); + } + } + LOG.info("Creating Fluss Source with Configuration: {}", flussConf); return new FlussSource<>( @@ -300,6 +377,8 @@ public FlussSource build() { offsetsInitializer, scanPartitionDiscoveryIntervalMs, deserializationSchema, - true); + streaming, + lakeSource, + LeaseContext.DEFAULT); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java index 0f54fc04b4..4db53e6257 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java @@ -266,6 +266,59 @@ public void testProjectedFields() { assertThat(source).isNotNull(); } + @Test + public void testSetLakeEnabled() { + // Given - using full() which is the default, lakeEnabled = true + // Note: Without actual data lake configuration, lake source won't be created + // This test verifies the builder accepts the method call + FlussSource source = + FlussSource.builder() + .setBootstrapServers(bootstrapServers) + .setDatabase(DEFAULT_DB) + .setTable(DEFAULT_TABLE_PATH.getTableName()) + .setStartingOffsets(OffsetsInitializer.full()) + .setLakeEnabled(true) + .setDeserializationSchema(new TestDeserializationSchema()) + .build(); + + // Then - source should be created (lake source may be null if table doesn't have datalake) + assertThat(source).isNotNull(); + } + + @Test + public void testSetLakeEnabledWithEarliestOffset() { + // Given - using earliest() offset, lakeEnabled = true + // Lake source won't be enabled since startup mode is not FULL + FlussSource source = + FlussSource.builder() + .setBootstrapServers(bootstrapServers) + .setDatabase(DEFAULT_DB) + .setTable(DEFAULT_TABLE_PATH.getTableName()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setLakeEnabled(true) + .setDeserializationSchema(new TestDeserializationSchema()) + .build(); + + // Then + assertThat(source).isNotNull(); + } + + @Test + public void testLakeEnabledDefaultValue() { + // Given - not setting lakeEnabled, should default to false + FlussSource source = + FlussSource.builder() + .setBootstrapServers(bootstrapServers) + .setDatabase(DEFAULT_DB) + .setTable(DEFAULT_TABLE_PATH.getTableName()) + .setStartingOffsets(OffsetsInitializer.full()) + .setDeserializationSchema(new TestDeserializationSchema()) + .build(); + + // Then + assertThat(source).isNotNull(); + } + // Test record class for tests private static class TestRecord { private int id; diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkDataStreamUnionReadITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkDataStreamUnionReadITCase.java new file mode 100644 index 0000000000..018440a832 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkDataStreamUnionReadITCase.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.flink; + +import org.apache.fluss.client.initializer.OffsetsInitializer; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.source.FlussSource; +import org.apache.fluss.flink.source.FlussSourceBuilder; +import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for Flink DataStream API union read from lake and Fluss. */ +class FlinkDataStreamUnionReadITCase extends FlinkUnionReadTestBase { + + @BeforeAll + protected static void beforeAll() { + FlinkUnionReadTestBase.beforeAll(); + } + + private String getBootstrapServers() { + return String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + } + + private StreamExecutionEnvironment createStreamingEnv() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + return env; + } + + private FlussSourceBuilder newFlussSourceBuilder(String tableName) { + return FlussSource.builder() + .setBootstrapServers(getBootstrapServers()) + .setDatabase(DEFAULT_DB) + .setTable(tableName) + .setDeserializationSchema(new RowDataDeserializationSchema()); + } + + @Test + void testLakeEnabledWithoutLakeConfiguration() throws Exception { + String tableName = "ds_no_lake_table"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + + // Create a simple log table WITHOUT data lake configuration + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "a").build(); + admin.createTable(tablePath, tableDescriptor, true).get(); + + // Write data + List writtenRows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + writtenRows.add(row(i, "value_" + i)); + } + writeRows(tablePath, writtenRows, true); + + // Create source with lakeEnabled=true (but table has no lake) + FlussSource flussSource = + newFlussSourceBuilder(tableName) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setLakeEnabled(true) + .build(); + + DataStreamSource stream = + createStreamingEnv() + .fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss DataStream No Lake"); + + // Should still read all data from Fluss + List results = stream.executeAndCollect(writtenRows.size()); + assertThat(results.size()).isEqualTo(writtenRows.size()); + } + + @Test + void testLakeDisabledEvenWithLakeTable() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = "ds_lake_disabled_table"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + long tableId = createLogTableWithDataLake(tablePath); + + // Write data and wait for sync to lake + List writtenRows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + writtenRows.add(row(i, "value_" + i)); + } + writeRows(tablePath, writtenRows, true); + waitUntilBucketSynced(tablePath, tableId, 1, false); + + // Create source with lakeEnabled=false (explicitly disabled) + FlussSource flussSource = + newFlussSourceBuilder(tableName) + .setStartingOffsets(OffsetsInitializer.full()) + .setLakeEnabled(false) + .build(); + + DataStreamSource stream = + createStreamingEnv() + .fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss DataStream Lake Disabled"); + + // Should read from Fluss log, not lake snapshot + List results = stream.executeAndCollect(writtenRows.size()); + assertThat(results.size()).isEqualTo(writtenRows.size()); + + jobClient.cancel().get(); + } + + private long createLogTableWithDataLake(TablePath tablePath) throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1, "a") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) + .build(); + return createTable(tablePath, tableDescriptor); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkDataStreamUnionReadITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkDataStreamUnionReadITCase.java new file mode 100644 index 0000000000..ed01a837ba --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkDataStreamUnionReadITCase.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.flink; + +import org.apache.fluss.client.initializer.OffsetsInitializer; +import org.apache.fluss.config.AutoPartitionTimeUnit; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.source.FlussSource; +import org.apache.fluss.flink.source.FlussSourceBuilder; +import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for Flink DataStream API union read from lake and Fluss. */ +class FlinkDataStreamUnionReadITCase extends FlinkUnionReadTestBase { + + @BeforeAll + protected static void beforeAll() { + FlinkUnionReadTestBase.beforeAll(); + } + + private String getBootstrapServers() { + return String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + } + + private StreamExecutionEnvironment createBatchEnv() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(2); + return env; + } + + private StreamExecutionEnvironment createStreamingEnv() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + return env; + } + + private FlussSourceBuilder newFlussSourceBuilder(String tableName) { + return FlussSource.builder() + .setBootstrapServers(getBootstrapServers()) + .setDatabase(DEFAULT_DB) + .setTable(tableName) + .setDeserializationSchema(new RowDataDeserializationSchema()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testUnionReadLogTableFromDataStream(boolean isPartitioned) throws Exception { + // Start tiering job (requires STREAMING mode) + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = "ds_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + + // Create log table with data lake enabled and write initial data + long tableId = createLogTableWithDataLake(tablePath, isPartitioned); + List writtenRows = new ArrayList<>(); + if (isPartitioned) { + Map partitionNameById = waitUntilPartitions(tablePath); + for (String partition : partitionNameById.values()) { + for (int i = 0; i < 3; i++) { + writtenRows.add(row(100 + i, "value_" + i, partition)); + } + } + } else { + for (int i = 0; i < 5; i++) { + writtenRows.add(row(100 + i, "value_" + i)); + } + } + writeRows(tablePath, writtenRows, true); + + // Wait until records have been synced to lake + waitUntilBucketSynced(tablePath, tableId, 1, isPartitioned); + + // Create FlussSource with union read enabled and read using BATCH mode + FlussSource flussSource = + newFlussSourceBuilder(tableName) + .setStartingOffsets(OffsetsInitializer.full()) + .setLakeEnabled(true) + .setStreaming(false) + .build(); + + DataStreamSource stream = + createBatchEnv() + .fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss DataStream Union Read"); + + // Verify all written rows are read + List results = stream.executeAndCollect(writtenRows.size()); + assertThat(results.size()).isEqualTo(writtenRows.size()); + + jobClient.cancel().get(); + } + + @Test + void testLakeEnabledWithoutLakeConfiguration() throws Exception { + String tableName = "ds_no_lake_table"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + + // Create a simple log table WITHOUT data lake configuration + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "a").build(); + admin.createTable(tablePath, tableDescriptor, true).get(); + + // Write data + List writtenRows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + writtenRows.add(row(i, "value_" + i)); + } + writeRows(tablePath, writtenRows, true); + + // Create source with lakeEnabled=true (but table has no lake) + FlussSource flussSource = + newFlussSourceBuilder(tableName) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setLakeEnabled(true) + .build(); + + DataStreamSource stream = + createStreamingEnv() + .fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss DataStream No Lake"); + + // Should still read all data from Fluss + List results = stream.executeAndCollect(writtenRows.size()); + assertThat(results.size()).isEqualTo(writtenRows.size()); + } + + @Test + void testLakeDisabledEvenWithLakeTable() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = "ds_lake_disabled_table"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + long tableId = createLogTableWithDataLake(tablePath, false); + + // Write data and wait for sync to lake + List writtenRows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + writtenRows.add(row(i, "value_" + i)); + } + writeRows(tablePath, writtenRows, true); + waitUntilBucketSynced(tablePath, tableId, 1, false); + + // Create source with lakeEnabled=false (explicitly disabled) + FlussSource flussSource = + newFlussSourceBuilder(tableName) + .setStartingOffsets(OffsetsInitializer.full()) + .setLakeEnabled(false) + .build(); + + DataStreamSource stream = + createStreamingEnv() + .fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss DataStream Lake Disabled"); + + // Should read from Fluss log, not lake snapshot + List results = stream.executeAndCollect(writtenRows.size()); + assertThat(results.size()).isEqualTo(writtenRows.size()); + + jobClient.cancel().get(); + } + + private long createLogTableWithDataLake(TablePath tablePath, boolean isPartitioned) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); + + TableDescriptor.Builder tableBuilder = + TableDescriptor.builder() + .distributedBy(1, "a") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + + if (isPartitioned) { + schemaBuilder.column("c", DataTypes.STRING()); + tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); + tableBuilder.partitionedBy("c"); + tableBuilder.property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); + } + tableBuilder.schema(schemaBuilder.build()); + return createTable(tablePath, tableBuilder.build()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testUnionReadPrimaryKeyTableFromDataStream(boolean isPartitioned) throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = "ds_pkTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + long tableId = createPkTableWithDataLake(tablePath, isPartitioned); + + // Write initial data (keys 1, 2, 3) + List initialRows = new ArrayList<>(); + if (isPartitioned) { + Map partitionNameById = waitUntilPartitions(tablePath); + for (String partition : partitionNameById.values()) { + initialRows.add(row(1, BinaryString.fromString("value_1"), partition)); + initialRows.add(row(2, BinaryString.fromString("value_2"), partition)); + initialRows.add(row(3, BinaryString.fromString("value_3"), partition)); + } + } else { + initialRows.add(row(1, BinaryString.fromString("value_1"))); + initialRows.add(row(2, BinaryString.fromString("value_2"))); + initialRows.add(row(3, BinaryString.fromString("value_3"))); + } + writeRows(tablePath, initialRows, false); + + // Wait for sync to lake + waitUntilBucketSynced(tablePath, tableId, 1, isPartitioned); + + // Write MORE data AFTER sync: update key 2, add key 4 (will be in log, not lake) + List newRows = new ArrayList<>(); + if (isPartitioned) { + Map partitionNameById = waitUntilPartitions(tablePath); + for (String partition : partitionNameById.values()) { + newRows.add(row(2, BinaryString.fromString("value_2_updated"), partition)); + newRows.add(row(4, BinaryString.fromString("value_4"), partition)); + } + } else { + newRows.add(row(2, BinaryString.fromString("value_2_updated"))); + newRows.add(row(4, BinaryString.fromString("value_4"))); + } + writeRows(tablePath, newRows, false); + + // Create FlussSource with union read enabled + FlussSource flussSource = + newFlussSourceBuilder(tableName) + .setStartingOffsets(OffsetsInitializer.full()) + .setLakeEnabled(true) + .setStreaming(false) + .build(); + + DataStreamSource stream = + createBatchEnv() + .fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss DataStream PK Union Read"); + + // Collect and verify: 4 distinct keys per partition (key 2 deduplicated to latest) + int expectedCount = isPartitioned ? 4 * waitUntilPartitions(tablePath).size() : 4; + List results = stream.executeAndCollect(expectedCount); + assertThat(results.size()).isEqualTo(expectedCount); + + // Verify values - key 2 should be updated, original value should NOT be present + Set actualValues = new HashSet<>(); + for (RowData row : results) { + actualValues.add(row.getString(1).toString()); + } + assertThat(actualValues).contains("value_1", "value_2_updated", "value_3", "value_4"); + assertThat(actualValues).doesNotContain("value_2"); + + jobClient.cancel().get(); + } + + private long createPkTableWithDataLake(TablePath tablePath, boolean isPartitioned) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); + + TableDescriptor.Builder tableBuilder = + TableDescriptor.builder() + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + + if (isPartitioned) { + schemaBuilder.column("c", DataTypes.STRING()); + tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); + tableBuilder.partitionedBy("c"); + tableBuilder.property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); + schemaBuilder.primaryKey("c", "a"); // partition key + primary key + } else { + schemaBuilder.primaryKey("a"); + } + + tableBuilder.schema(schemaBuilder.build()); + return createTable(tablePath, tableBuilder.build()); + } +} diff --git a/website/docs/engine-flink/datastream.mdx b/website/docs/engine-flink/datastream.mdx index 3d2910a31d..bef300fb5a 100644 --- a/website/docs/engine-flink/datastream.mdx +++ b/website/docs/engine-flink/datastream.mdx @@ -82,6 +82,90 @@ DataStreamSource stream = stream.print(); ``` +### Union Read (Lake + Fluss) + +Union Read combines historical data from a data lake snapshot (e.g., Paimon) with real-time data from Fluss logs, giving you a complete and up-to-date view of your data. + +#### Prerequisites + +- The table must be created with `'table.datalake.enabled' = 'true'` +- The corresponding lake connector JAR must be available, depending on your data lake type: + - Paimon: `fluss-lake-paimon-$FLUSS_VERSION$.jar` + - Iceberg: `fluss-lake-iceberg-$FLUSS_VERSION$.jar` +- At least one lake snapshot must exist (the tiering job must have run at least once) + +#### Configuration + +To enable union read in DataStream API, use the following key methods: + +- **setLakeEnabled(true):** Enables union read mode +- **setStartingOffsets(OffsetsInitializer.full()):** Required for union read (this is also the default) +- **setStreaming(false):** Required when running in BATCH mode + +:::note +When running in Flink BATCH mode, you must explicitly call `setStreaming(false)` on the source builder. Otherwise, the source will be treated as an unbounded source, which is incompatible with batch execution. +::: + +#### BATCH Mode Example + +In batch mode, the source reads the lake snapshot merged with the latest Fluss data and then terminates. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setRuntimeMode(RuntimeExecutionMode.BATCH); + +FlussSource flussSource = FlussSource.builder() + .setBootstrapServers("localhost:9123") + .setDatabase("mydb") + .setTable("orders") + .setLakeEnabled(true) // Enable union read + .setStartingOffsets(OffsetsInitializer.full()) // Required (default) + .setStreaming(false) // Required for BATCH mode + .setDeserializationSchema(new RowDataDeserializationSchema()) + .build(); + +DataStreamSource stream = env.fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss Union Read Source" +); + +stream.print(); +env.execute("Union Read Batch Job"); +``` + +#### STREAMING Mode Example + +In streaming mode, the source first reads the lake snapshot, then continues to consume real-time changes from Fluss logs indefinitely. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + +FlussSource flussSource = FlussSource.builder() + .setBootstrapServers("localhost:9123") + .setDatabase("mydb") + .setTable("orders") + .setLakeEnabled(true) // Enable union read + .setStartingOffsets(OffsetsInitializer.full()) // Required (default) + // setStreaming(false) is NOT needed for streaming mode + .setDeserializationSchema(new RowDataDeserializationSchema()) + .build(); + +DataStreamSource stream = env.fromSource( + flussSource, + WatermarkStrategy.noWatermarks(), + "Fluss Union Read Source" +); + +stream.print(); +env.execute("Union Read Streaming Job"); +``` + +#### Primary Key Table Deduplication + +When performing union read on a **Primary Key Table**, records with the same key that exist in both the lake snapshot and Fluss logs are automatically deduplicated. The most recent value (from Fluss) takes precedence, ensuring data consistency without manual intervention. + ### Configuration Options The `FlussSourceBuilder` provides several methods for configuring the source connector: