2020import org .apache .flink .api .connector .source .SourceOutput ;
2121import org .apache .flink .cdc .common .event .CreateTableEvent ;
2222import org .apache .flink .cdc .common .event .Event ;
23+ import org .apache .flink .cdc .common .event .SchemaChangeEvent ;
2324import org .apache .flink .cdc .common .schema .Schema ;
24- import org .apache .flink .cdc .common .types .DataType ;
2525import org .apache .flink .cdc .connectors .base .options .StartupOptions ;
2626import org .apache .flink .cdc .connectors .base .source .meta .offset .OffsetFactory ;
2727import org .apache .flink .cdc .connectors .base .source .meta .split .SnapshotSplit ;
2828import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitBase ;
2929import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitState ;
3030import org .apache .flink .cdc .connectors .base .source .metrics .SourceReaderMetrics ;
31- import org .apache .flink .cdc .connectors .base .source .reader .IncrementalSourceRecordEmitter ;
3231import org .apache .flink .cdc .connectors .postgres .source .PostgresDialect ;
3332import org .apache .flink .cdc .connectors .postgres .source .config .PostgresSourceConfig ;
33+ import org .apache .flink .cdc .connectors .postgres .source .schema .PostgresSchemaRecord ;
3434import org .apache .flink .cdc .connectors .postgres .source .utils .TableDiscoveryUtils ;
3535import org .apache .flink .cdc .connectors .postgres .utils .PostgresSchemaUtils ;
36- import org .apache .flink .cdc .connectors .postgres .utils .PostgresTypeUtils ;
3736import org .apache .flink .cdc .debezium .DebeziumDeserializationSchema ;
3837import org .apache .flink .cdc .debezium .event .DebeziumEventDeserializationSchema ;
3938import org .apache .flink .connector .base .source .reader .RecordEmitter ;
4039
4140import io .debezium .connector .postgresql .connection .PostgresConnection ;
4241import io .debezium .data .Envelope ;
43- import io .debezium .relational .Column ;
4442import io .debezium .relational .Table ;
4543import io .debezium .relational .TableId ;
4644import io .debezium .relational .history .TableChanges ;
4745import org .apache .kafka .connect .data .Field ;
4846import org .apache .kafka .connect .data .Struct ;
4947import org .apache .kafka .connect .source .SourceRecord ;
48+ import org .slf4j .Logger ;
49+ import org .slf4j .LoggerFactory ;
5050
5151import java .sql .SQLException ;
5252import java .util .HashMap ;
5353import java .util .HashSet ;
5454import java .util .List ;
5555import java .util .Map ;
56- import java .util .Objects ;
5756import java .util .Set ;
5857
5958import static io .debezium .connector .AbstractSourceInfo .SCHEMA_NAME_KEY ;
6261import static org .apache .flink .cdc .connectors .base .utils .SourceRecordUtils .isDataChangeRecord ;
6362import static org .apache .flink .cdc .connectors .base .utils .SourceRecordUtils .isSchemaChangeEvent ;
6463import static org .apache .flink .cdc .connectors .postgres .utils .PostgresSchemaUtils .toCdcTableId ;
64+ import static org .apache .flink .cdc .connectors .postgres .utils .SchemaChangeUtil .inferSchemaChangeEvent ;
65+ import static org .apache .flink .cdc .connectors .postgres .utils .SchemaChangeUtil .toCreateTableEvent ;
6566
6667/** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */
67- public class PostgresPipelineRecordEmitter <T > extends IncrementalSourceRecordEmitter <T > {
68+ public class PostgresPipelineRecordEmitter <T > extends PostgresSourceRecordEmitter <T > {
69+ private static final Logger LOG = LoggerFactory .getLogger (PostgresPipelineRecordEmitter .class );
6870 private final PostgresSourceConfig sourceConfig ;
6971 private final PostgresDialect postgresDialect ;
7072
7173 // Used when startup mode is initial
72- private Set <TableId > alreadySendCreateTableTables ;
74+ private final Set <TableId > alreadySendCreateTableTables ;
75+ private final boolean isBounded ;
76+ private final boolean includeDatabaseInTableId ;
77+ private final Map <TableId , CreateTableEvent > createTableEventCache ;
7378
7479 // Used when startup mode is not initial
7580 private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true ;
76- private boolean isBounded = false ;
77- private boolean includeDatabaseInTableId = false ;
78-
79- private final Map <TableId , CreateTableEvent > createTableEventCache ;
8081
8182 public PostgresPipelineRecordEmitter (
82- DebeziumDeserializationSchema debeziumDeserializationSchema ,
83+ DebeziumDeserializationSchema < T > debeziumDeserializationSchema ,
8384 SourceReaderMetrics sourceReaderMetrics ,
8485 PostgresSourceConfig sourceConfig ,
8586 OffsetFactory offsetFactory ,
@@ -108,16 +109,11 @@ public void applySplit(SourceSplitBase split) {
108109 } else {
109110 for (Map .Entry <TableId , TableChanges .TableChange > entry :
110111 split .getTableSchemas ().entrySet ()) {
111- TableId tableId =
112- entry .getKey (); // Use the TableId from the map key which contains full info
113112 TableChanges .TableChange tableChange = entry .getValue ();
113+
114+ Table table = tableChange .getTable ();
114115 CreateTableEvent createTableEvent =
115- new CreateTableEvent (
116- toCdcTableId (
117- tableId ,
118- sourceConfig .getDatabaseList ().get (0 ),
119- includeDatabaseInTableId ),
120- buildSchemaFromTable (tableChange .getTable ()));
116+ toCreateTableEvent (table , sourceConfig , postgresDialect );
121117 ((DebeziumEventDeserializationSchema ) debeziumDeserializationSchema )
122118 .applyChangeEvent (createTableEvent );
123119 }
@@ -141,60 +137,43 @@ protected void processElement(
141137 sendCreateTableEvent (tableId , (SourceOutput <Event >) output );
142138 alreadySendCreateTableTables .add (tableId );
143139 }
144- } else {
145- boolean isDataChangeRecord = isDataChangeRecord (element );
146- if (isDataChangeRecord || isSchemaChangeEvent (element )) {
147- TableId tableId = getTableId (element );
148- if (!alreadySendCreateTableTables .contains (tableId )) {
149- CreateTableEvent createTableEvent = createTableEventCache .get (tableId );
150- if (createTableEvent != null ) {
151- output .collect ((T ) createTableEvent );
152- }
153- alreadySendCreateTableTables .add (tableId );
154- }
155- // In rare case, we may miss some CreateTableEvents before DataChangeEvents.
156- // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
157- if (isDataChangeRecord && !createTableEventCache .containsKey (tableId )) {
158- CreateTableEvent createTableEvent = getCreateTableEvent (sourceConfig , tableId );
140+ } else if (isDataChangeRecord (element )) {
141+ TableId tableId = getTableId (element );
142+ if (!alreadySendCreateTableTables .contains (tableId )) {
143+ CreateTableEvent createTableEvent = createTableEventCache .get (tableId );
144+ if (createTableEvent != null ) {
159145 output .collect ((T ) createTableEvent );
160- createTableEventCache .put (tableId , createTableEvent );
161146 }
147+ alreadySendCreateTableTables .add (tableId );
148+ }
149+ // In rare case, we may miss some CreateTableEvents before DataChangeEvents.
150+ // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
151+ if (!createTableEventCache .containsKey (tableId )) {
152+ CreateTableEvent createTableEvent = getCreateTableEvent (sourceConfig , tableId );
153+ output .collect ((T ) createTableEvent );
154+ createTableEventCache .put (tableId , createTableEvent );
162155 }
156+ } else if (isSchemaChangeEvent (element ) && sourceConfig .isSchemaChangeEnabled ()) {
157+ handleSchemaChangeRecord (element , output , splitState );
163158 }
164159 super .processElement (element , output , splitState );
165160 }
166161
167- private Schema buildSchemaFromTable (Table table ) {
168- List <Column > columns = table .columns ();
169- Schema .Builder tableBuilder = Schema .newBuilder ();
170- for (int i = 0 ; i < columns .size (); i ++) {
171- Column column = columns .get (i );
172-
173- String colName = column .name ();
174- DataType dataType ;
175- try (PostgresConnection jdbc = postgresDialect .openJdbcConnection ()) {
176- dataType =
177- PostgresTypeUtils .fromDbzColumn (
178- column ,
179- this .sourceConfig .getDbzConnectorConfig (),
180- jdbc .getTypeRegistry ());
181- }
182- if (!column .isOptional ()) {
183- dataType = dataType .notNull ();
184- }
185- tableBuilder .physicalColumn (
186- colName ,
187- dataType ,
188- column .comment (),
189- column .defaultValueExpression ().orElse (null ));
190- }
191- tableBuilder .comment (table .comment ());
192-
193- List <String > primaryKey = table .primaryKeyColumnNames ();
194- if (Objects .nonNull (primaryKey ) && !primaryKey .isEmpty ()) {
195- tableBuilder .primaryKey (primaryKey );
162+ private void handleSchemaChangeRecord (
163+ SourceRecord element , SourceOutput <T > output , SourceSplitState splitState ) {
164+ Map <TableId , TableChanges .TableChange > existedTableSchemas =
165+ splitState .toSourceSplit ().getTableSchemas ();
166+ PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord ) element ;
167+ Table schemaAfter = schemaRecord .getTable ();
168+ Table schemaBefore = null ;
169+ if (existedTableSchemas .containsKey (schemaAfter .id ())) {
170+ schemaBefore = existedTableSchemas .get (schemaAfter .id ()).getTable ();
196171 }
197- return tableBuilder .build ();
172+ List <SchemaChangeEvent > schemaChangeEvents =
173+ inferSchemaChangeEvent (
174+ schemaAfter .id (), schemaBefore , schemaAfter , sourceConfig , postgresDialect );
175+ LOG .info ("Inferred Schema change events: {}" , schemaChangeEvents );
176+ schemaChangeEvents .forEach (schemaChangeEvent -> output .collect ((T ) schemaChangeEvent ));
198177 }
199178
200179 private void sendCreateTableEvent (TableId tableId , SourceOutput <Event > output ) {
0 commit comments