Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SNAPSHOT_MODE;
import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection;
Expand Down Expand Up @@ -125,24 +126,42 @@ public void configure(SourceSplitBase sourceSplitBase) {
LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase);
PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
if (sourceSplitBase instanceof SnapshotSplit) {
dbzConfig =
new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
"table.include.list",
getTableList(
((SnapshotSplit) sourceSplitBase).getTableId()))
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build());

Configuration.Builder builder =
dbzConfig
.getConfig()
.edit()
.with(
"table.include.list",
getTableList(((SnapshotSplit) sourceSplitBase).getTableId()))
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0);

try {
String autoCreateMode =
dbzConfig.getConfig().getString(PUBLICATION_AUTOCREATE_MODE.name());
if ("filtered".equalsIgnoreCase(autoCreateMode)) {
// disable publication auto create for snapshot split when auto create mode is
// filtered
// Prevent backfill slots from modifying the shared publication.
// Without this, each backfill slot would SET the publication to only include
// its current table
builder =
builder.with(
PUBLICATION_AUTOCREATE_MODE.name(),
PostgresConnectorConfig.AutoCreateMode.DISABLED);
}
} catch (Exception e) {
// ignore
}

dbzConfig = new PostgresConnectorConfig(builder.build());
} else {

Configuration.Builder builder = dbzConfig.getConfig().edit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,23 @@ void testReadMultipleTableWithSingleParallelism(String scanStartupMode) throws E
scanStartupMode);
}

@Test
void testFilteredPublication() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("debezium.publication.autocreate.mode", "filtered");
options.put("scan.incremental.snapshot.chunk.size", "10");
testPostgresParallelSource(
1,
DEFAULT_SCAN_STARTUP_MODE,
PostgresTestUtils.FailoverType.NONE,
PostgresTestUtils.FailoverPhase.NEVER,
new String[] {"Customers", "customers_1", "customers_2"},
RestartStrategies.fixedDelayRestart(1, 0),
options,
this::checkStreamDataForFilteredPublication,
false);
}

@ParameterizedTest
@ValueSource(strings = {"initial", "latest-offset"})
void testReadMultipleTableWithMultipleParallelism(String scanStartupMode) throws Exception {
Expand Down Expand Up @@ -833,6 +850,29 @@ private void testPostgresParallelSource(
Map<String, String> otherOptions,
StreamDataChecker streamDataChecker)
throws Exception {
testPostgresParallelSource(
parallelism,
scanStartupMode,
failoverType,
failoverPhase,
captureCustomerTables,
restartStrategyConfiguration,
otherOptions,
streamDataChecker,
true);
}

private void testPostgresParallelSource(
int parallelism,
String scanStartupMode,
PostgresTestUtils.FailoverType failoverType,
PostgresTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
Map<String, String> otherOptions,
StreamDataChecker streamDataChecker,
boolean checkSnapshot)
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand All @@ -844,7 +884,7 @@ private void testPostgresParallelSource(
TableResult tableResult = tEnv.executeSql("select * from customers");

// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
if (checkSnapshot && DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
}

Expand Down Expand Up @@ -963,6 +1003,103 @@ private void checkSnapshotData(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
}

private void checkStreamDataForFilteredPublication(
TableResult tableResult,
PostgresTestUtils.FailoverType failoverType,
PostgresTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
waitUntilJobRunning(tableResult);
CloseableIterator<Row> iterator = tableResult.collect();
Optional<JobClient> optionalJobClient = tableResult.getJobClient();
assertThat(optionalJobClient).isPresent();

String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};

int recordsPerTable = snapshotForSingleTable.length;
int totalSnapshotRecords = recordsPerTable * captureCustomerTables.length;
int recordsFromFirstTwoTables = recordsPerTable * 2;
int recordsWhenTable3Starts = recordsFromFirstTwoTables + 1;

List<String> collectedSnapshotData = new ArrayList<>();
boolean insertedDuringSnapshot = false;

while (iterator.hasNext() && collectedSnapshotData.size() < totalSnapshotRecords) {
Row row = iterator.next();
String rowStr = row.toString();
collectedSnapshotData.add(rowStr);

// Insert test record right after Table 3 starts its snapshot phase
// This ensures the insert happens while Table 3's backfill slot may have altered the
// publication
if (collectedSnapshotData.size() == recordsWhenTable3Starts
&& !insertedDuringSnapshot) {
Thread.sleep(1000L); // Give Table 3's backfill slot time to alter the publication
try (PostgresConnection connection = getConnection()) {
connection.setAutoCommit(false);
connection.execute(
"INSERT INTO customer.Customers VALUES(9999, 'test_user', 'TestCity', '123456789')");
connection.commit();
}
insertedDuringSnapshot = true;
}
}

List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}

assertEqualsInAnyOrder(expectedSnapshotData, collectedSnapshotData);

Thread.sleep(15000L);

List<String> collectedStreamData = new ArrayList<>();
long startTime = System.currentTimeMillis();
long timeout = 30000L;
int maxRecords = 100;

while (collectedStreamData.size() < maxRecords
&& (System.currentTimeMillis() - startTime) < timeout) {
if (hasNextData(iterator)) {
String record = iterator.next().toString();
collectedStreamData.add(record);
} else {
Thread.sleep(1000L);
}
}

boolean insertFound =
collectedStreamData.contains("+I[9999, test_user, TestCity, 123456789]");
assertThat(insertFound)
.as(
"The insert into Customers table during Table 3 snapshot phase should be captured.")
.isTrue();
}

private void checkStreamData(
TableResult tableResult,
PostgresTestUtils.FailoverType failoverType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,37 @@ VALUES (101,'user_1','Shanghai','123567891234'),
(1019,'user_20','Shanghai','123567891234'),
(2000,'user_21','Shanghai','123567891234');

CREATE TABLE customers_2 (
"Id" INTEGER NOT NULL PRIMARY KEY,
"Name" VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
ALTER TABLE customers_2 REPLICA IDENTITY FULL;

INSERT INTO customers_2
VALUES (101,'user_1','Shanghai','123567891234'),
(102,'user_2','Shanghai','123567891234'),
(103,'user_3','Shanghai','123567891234'),
(109,'user_4','Shanghai','123567891234'),
(110,'user_5','Shanghai','123567891234'),
(111,'user_6','Shanghai','123567891234'),
(118,'user_7','Shanghai','123567891234'),
(121,'user_8','Shanghai','123567891234'),
(123,'user_9','Shanghai','123567891234'),
(1009,'user_10','Shanghai','123567891234'),
(1010,'user_11','Shanghai','123567891234'),
(1011,'user_12','Shanghai','123567891234'),
(1012,'user_13','Shanghai','123567891234'),
(1013,'user_14','Shanghai','123567891234'),
(1014,'user_15','Shanghai','123567891234'),
(1015,'user_16','Shanghai','123567891234'),
(1016,'user_17','Shanghai','123567891234'),
(1017,'user_18','Shanghai','123567891234'),
(1018,'user_19','Shanghai','123567891234'),
(1019,'user_20','Shanghai','123567891234'),
(2000,'user_21','Shanghai','123567891234');

CREATE TABLE customers_no_pk (
"Id" INTEGER NOT NULL,
"Name" VARCHAR(255) NOT NULL DEFAULT 'flink',
Expand Down
Loading