Skip to content

Commit ad0ddea

Browse files
committed
[FLINK-38840][postgres] Spotless fixes
1 parent 9ed8e4a commit ad0ddea

2 files changed

Lines changed: 34 additions & 26 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -127,37 +127,41 @@ public void configure(SourceSplitBase sourceSplitBase) {
127127
PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
128128
if (sourceSplitBase instanceof SnapshotSplit) {
129129

130-
Configuration.Builder builder = dbzConfig
131-
.getConfig()
132-
.edit()
133-
.with(
134-
"table.include.list",
135-
getTableList(
136-
((SnapshotSplit) sourceSplitBase).getTableId()))
137-
.with(
138-
SLOT_NAME.name(),
139-
((PostgresSourceConfig) sourceConfig)
140-
.getSlotNameForBackfillTask())
141-
// drop slot for backfill stream split
142-
.with(DROP_SLOT_ON_STOP.name(), true)
143-
// Disable heartbeat event in snapshot split fetcher
144-
.with(Heartbeat.HEARTBEAT_INTERVAL, 0);
130+
Configuration.Builder builder =
131+
dbzConfig
132+
.getConfig()
133+
.edit()
134+
.with(
135+
"table.include.list",
136+
getTableList(((SnapshotSplit) sourceSplitBase).getTableId()))
137+
.with(
138+
SLOT_NAME.name(),
139+
((PostgresSourceConfig) sourceConfig)
140+
.getSlotNameForBackfillTask())
141+
// drop slot for backfill stream split
142+
.with(DROP_SLOT_ON_STOP.name(), true)
143+
// Disable heartbeat event in snapshot split fetcher
144+
.with(Heartbeat.HEARTBEAT_INTERVAL, 0);
145145

146146
try {
147-
String autoCreateMode = dbzConfig.getConfig().getString(PUBLICATION_AUTOCREATE_MODE.name());
147+
String autoCreateMode =
148+
dbzConfig.getConfig().getString(PUBLICATION_AUTOCREATE_MODE.name());
148149
if ("filtered".equalsIgnoreCase(autoCreateMode)) {
149-
// disable publication auto create for snapshot split when auto create mode is filtered
150+
// disable publication auto create for snapshot split when auto create mode is
151+
// filtered
150152
// Prevent backfill slots from modifying the shared publication.
151-
// Without this, each backfill slot would SET the publication to only include its current table
152-
builder = builder.with(PUBLICATION_AUTOCREATE_MODE.name(),
153-
PostgresConnectorConfig.AutoCreateMode.DISABLED);
153+
// Without this, each backfill slot would SET the publication to only include
154+
// its current table
155+
builder =
156+
builder.with(
157+
PUBLICATION_AUTOCREATE_MODE.name(),
158+
PostgresConnectorConfig.AutoCreateMode.DISABLED);
154159
}
155160
} catch (Exception e) {
156161
// ignore
157162
}
158163

159-
dbzConfig =
160-
new PostgresConnectorConfig(builder.build());
164+
dbzConfig = new PostgresConnectorConfig(builder.build());
161165
} else {
162166

163167
Configuration.Builder builder = dbzConfig.getConfig().edit();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,8 +1053,10 @@ private void checkStreamDataForFilteredPublication(
10531053
collectedSnapshotData.add(rowStr);
10541054

10551055
// Insert test record right after Table 3 starts its snapshot phase
1056-
// This ensures the insert happens while Table 3's backfill slot may have altered the publication
1057-
if (collectedSnapshotData.size() == recordsWhenTable3Starts && !insertedDuringSnapshot) {
1056+
// This ensures the insert happens while Table 3's backfill slot may have altered the
1057+
// publication
1058+
if (collectedSnapshotData.size() == recordsWhenTable3Starts
1059+
&& !insertedDuringSnapshot) {
10581060
Thread.sleep(1000L); // Give Table 3's backfill slot time to alter the publication
10591061
try (PostgresConnection connection = getConnection()) {
10601062
connection.setAutoCommit(false);
@@ -1090,9 +1092,11 @@ private void checkStreamDataForFilteredPublication(
10901092
}
10911093
}
10921094

1093-
boolean insertFound = collectedStreamData.contains("+I[9999, test_user, TestCity, 123456789]");
1095+
boolean insertFound =
1096+
collectedStreamData.contains("+I[9999, test_user, TestCity, 123456789]");
10941097
assertThat(insertFound)
1095-
.as("The insert into Customers table during Table 3 snapshot phase should be captured.")
1098+
.as(
1099+
"The insert into Customers table during Table 3 snapshot phase should be captured.")
10961100
.isTrue();
10971101
}
10981102

0 commit comments

Comments
 (0)