Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,25 @@ private int findIndex(int[] array, int target) {
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
if (logScanFinished) {
if (lakeRecordIterators.isEmpty()) {
List<RecordReader> recordReaders = new ArrayList<>();
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
|| lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
lakeRecordIterators = Collections.emptyList();
// pass null split to get rowComparator
recordReaders.add(lakeSource.createRecordReader(() -> null));
} else {
for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
lakeRecordIterators.add(
lakeSource.createRecordReader(() -> lakeSplit).read());
recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit));
}
}
for (RecordReader reader : recordReaders) {
if (reader instanceof SortedRecordReader) {
rowComparator = ((SortedRecordReader) reader).order();
} else {
throw new UnsupportedOperationException(
"lake records must instance of sorted view.");
}
lakeRecordIterators.add(reader.read());
}
}
if (currentSortMergeReader == null) {
currentSortMergeReader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
import org.apache.fluss.flink.source.reader.LeaseContext;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;

Expand Down Expand Up @@ -54,10 +56,16 @@
* }</pre>
*
* @param <OUT> The type of records produced by this source
* @see FlussSourceBuilder
*/
public class FlussSource<OUT> extends FlinkSource<OUT> {
private static final long serialVersionUID = 1L;

/**
* Creates a FlussSource without lake source (backward compatible constructor).
*
* <p>Use {@link #builder()} to create a FlussSource with more configuration options.
*/
FlussSource(
Configuration flussConf,
TablePath tablePath,
Expand All @@ -82,9 +90,41 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
deserializationSchema,
streaming,
null,
null,
LeaseContext.DEFAULT);
}

/** Creates a FlussSource with optional lake source for union read. */
FlussSource(
Configuration flussConf,
TablePath tablePath,
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
boolean streaming,
@Nullable LakeSource<LakeSplit> lakeSource,
LeaseContext leaseContext) {
// TODO: Support partition pushDown in datastream
super(
flussConf,
tablePath,
hasPrimaryKey,
isPartitioned,
sourceOutputType,
projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
streaming,
null,
lakeSource,
leaseContext);
}

/**
* Get a FlussSourceBuilder to build a {@link FlussSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
import org.apache.fluss.flink.source.reader.LeaseContext;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;
Expand All @@ -38,6 +42,7 @@
import java.util.concurrent.ExecutionException;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;

/**
* Builder class for creating {@link FlussSource} instances.
Expand All @@ -60,6 +65,7 @@
* }</pre>
*
* @param <OUT> The type of records produced by the source being built
* @see FlussSource
*/
public class FlussSourceBuilder<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(FlussSourceBuilder.class);
Expand All @@ -77,6 +83,12 @@ public class FlussSourceBuilder<OUT> {
private String database;
private String tableName;

// whether to enable lake source for union read
private boolean lakeEnabled = false;

// whether this is a streaming source (default: true for unbounded streaming mode)
private boolean streaming = true;

/**
* Sets the bootstrap servers for the Fluss source connection.
*
Expand Down Expand Up @@ -116,6 +128,36 @@ public FlussSourceBuilder<OUT> setTable(String table) {
return this;
}

/**
* Enables or disables lake source for union read.
*
* <p>When enabled and the table has data lake configured (e.g., Paimon/Iceberg), the source
* will read historical data from the lake and real-time data from Fluss. Union read only takes
* effect when the starting offset is {@link OffsetsInitializer#full()}.
*
* @param lakeEnabled true to enable lake source for union read, false to disable
* @return this builder
*/
public FlussSourceBuilder<OUT> setLakeEnabled(boolean lakeEnabled) {
this.lakeEnabled = lakeEnabled;
return this;
}

/**
* Sets whether this source should run in streaming mode or batch mode.
*
* <p>In streaming mode (default), the source is unbounded. In batch mode, the source is bounded
* and terminates after reading all available data. For Flink BATCH execution mode with lake
* enabled, set this to {@code false}.
*
* @param streaming true for streaming (unbounded) mode, false for batch (bounded) mode
* @return this builder
*/
public FlussSourceBuilder<OUT> setStreaming(boolean streaming) {
this.streaming = streaming;
return this;
}

/**
* Sets the scan partition discovery interval in milliseconds.
*
Expand Down Expand Up @@ -288,6 +330,41 @@ public FlussSource<OUT> build() {
? tableInfo.getRowType().project(projectedFields)
: tableInfo.getRowType();

// create lake source if lake is enabled and table has data lake configured
LakeSource<LakeSplit> lakeSource = null;
if (lakeEnabled) {
// check if table has data lake format configured
Map<String, String> tableOptions = new HashMap<>();
tableOptions.putAll(tableInfo.getCustomProperties().toMap());
tableOptions.putAll(tableInfo.getProperties().toMap());

org.apache.fluss.metadata.DataLakeFormat dataLakeFormat =
flussConf.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
if (dataLakeFormat != null) {
// only enable union read when startup mode is FULL
// OffsetsInitializer.full() is the default and enables union read
if (offsetsInitializer instanceof SnapshotOffsetsInitializer) {
try {
lakeSource = createLakeSource(tablePath, tableOptions);
LOG.info("Lake source created for union read on table: {}", tablePath);
} catch (Exception e) {
LOG.warn(
"Failed to create lake source for table {}, union read will be disabled: {}",
tablePath,
e.getMessage());
}
} else {
LOG.info(
"Lake is enabled but startup mode is not FULL, union read is disabled. "
+ "Use OffsetsInitializer.full() to enable union read.");
}
} else {
LOG.info(
"Lake is enabled but table {} does not have data lake configured, union read is disabled.",
tablePath);
}
}

LOG.info("Creating Fluss Source with Configuration: {}", flussConf);

return new FlussSource<>(
Expand All @@ -300,6 +377,8 @@ public FlussSource<OUT> build() {
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
true);
streaming,
lakeSource,
LeaseContext.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,59 @@ public void testProjectedFields() {
assertThat(source).isNotNull();
}

@Test
public void testSetLakeEnabled() {
// Given - using full() which is the default, lakeEnabled = true
// Note: Without actual data lake configuration, lake source won't be created
// This test verifies the builder accepts the method call
FlussSource<TestRecord> source =
FlussSource.<TestRecord>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(DEFAULT_DB)
.setTable(DEFAULT_TABLE_PATH.getTableName())
.setStartingOffsets(OffsetsInitializer.full())
.setLakeEnabled(true)
.setDeserializationSchema(new TestDeserializationSchema())
.build();

// Then - source should be created (lake source may be null if table doesn't have datalake)
assertThat(source).isNotNull();
}

@Test
public void testSetLakeEnabledWithEarliestOffset() {
// Given - using earliest() offset, lakeEnabled = true
// Lake source won't be enabled since startup mode is not FULL
FlussSource<TestRecord> source =
FlussSource.<TestRecord>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(DEFAULT_DB)
.setTable(DEFAULT_TABLE_PATH.getTableName())
.setStartingOffsets(OffsetsInitializer.earliest())
.setLakeEnabled(true)
.setDeserializationSchema(new TestDeserializationSchema())
.build();

// Then
assertThat(source).isNotNull();
}

@Test
public void testLakeEnabledDefaultValue() {
// Given - not setting lakeEnabled, should default to false
FlussSource<TestRecord> source =
FlussSource.<TestRecord>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(DEFAULT_DB)
.setTable(DEFAULT_TABLE_PATH.getTableName())
.setStartingOffsets(OffsetsInitializer.full())
.setDeserializationSchema(new TestDeserializationSchema())
.build();

// Then
assertThat(source).isNotNull();
}

// Test record class for tests
private static class TestRecord {
private int id;
Expand Down
Loading