From cefcbe0ae56d27685d38b65d9d71f548ce0b1eee Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 28 Feb 2026 21:20:12 +0800 Subject: [PATCH 1/2] [FLINK-39071][oracle-cdc] Fix Oracle pipeline connector wrong field case --- .../oracle/utils/OracleSchemaUtils.java | 4 ++-- .../oracle/source/OraclePipelineITCase.java | 18 +++++++++--------- .../cdc/pipeline/tests/OracleE2eITCase.java | 6 +++--- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java index e81c4110cfb..d6fc6437479 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java @@ -158,7 +158,7 @@ public static Schema getSchema( DataType dataType = OracleTypeUtils.fromDbzColumn(column); org.apache.flink.cdc.common.schema.Column cdcColumn = org.apache.flink.cdc.common.schema.Column.physicalColumn( - column.name().toLowerCase(Locale.ROOT), dataType); + column.name(), dataType); list.add(cdcColumn); } return Schema.newBuilder().setColumns(list).primaryKey(pks).build(); @@ -181,7 +181,7 @@ public static List getTablePks( while (rs.next()) { String columnName; columnName = rs.getString(1); - list.add(columnName.toLowerCase(Locale.ROOT)); + list.add(columnName); } return list; }); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java index cd90caad827..6f13b7562ef 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java @@ -981,10 +981,10 @@ public void testGeometryType() throws Exception { new CreateTableEvent( TableId.tableId("DEBEZIUM", "MYLAKE"), Schema.newBuilder() - .physicalColumn("feature_id", DataTypes.BIGINT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(32)) - .physicalColumn("shape", DataTypes.STRING()) - .primaryKey(Arrays.asList("feature_id")) + .physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(32)) + .physicalColumn("SHAPE", DataTypes.STRING()) + .primaryKey(Arrays.asList("FEATURE_ID")) .build()); RowType rowType = @@ -1568,11 +1568,11 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { return new CreateTableEvent( tableId, Schema.newBuilder() - .physicalColumn("id", DataTypes.BIGINT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(255).notNull()) - .physicalColumn("description", DataTypes.VARCHAR(512)) - .physicalColumn("weight", DataTypes.FLOAT()) - .primaryKey(Collections.singletonList("id")) + .physicalColumn("ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512)) + .physicalColumn("WEIGHT", DataTypes.FLOAT()) + .primaryKey(Collections.singletonList("ID")) .build()); } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java index 0be4734e79c..e6b5387a224 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java @@ -150,7 +150,7 @@ void testSyncWholeDatabase() throws Exception { Statement stat = conn.createStatement()) { waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception { "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception { waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( From 844da5c9381b2772bb365ff8c44dcb160ad31100 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Tue, 24 Mar 2026 22:47:56 +0800 Subject: [PATCH 2/2] [FLINK-39171][oracle-cdc] Extract removeQuotes as utility method for Oracle column name handling Extract the removeQuotes method to OracleSchemaUtils to centralize Oracle table/column name case handling logic. This method follows Oracle naming rules: names enclosed in double quotes retain their original case, otherwise they are converted to uppercase. This change improves code reuse between OracleSchemaUtils and BaseParserListener, making it easier to add configuration options for case handling in the future. Co-Authored-By: Claude Opus 4.6 --- .../source/parser/BaseParserListener.java | 19 ++--------- .../oracle/utils/OracleSchemaUtils.java | 34 ++++++++++++++++--- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java index efd4d304356..f56fd3a2ba0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.connectors.oracle.source.parser; +import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; + import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener; @@ -76,21 +78,6 @@ public String getColumnName(final PlSqlParser.New_column_nameContext ctx) { * @return parsed table or column name from the supplied name argument */ private static String getTableOrColumnName(String name) { - return removeQuotes(name, true); - } - - /** - * Removes leading and trailing double quote characters from the provided string. - * - * @param text value to have double quotes removed - * @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted - * @return string that has had quotes removed - */ - @SuppressWarnings("SameParameterValue") - private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) { - if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) { - return text.substring(1, text.length() - 1); - } - return upperCaseIfNotQuoted ? text.toUpperCase() : text; + return OracleSchemaUtils.removeQuotes(name); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java index d6fc6437479..083442b5170 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java @@ -46,6 +46,25 @@ public class OracleSchemaUtils { private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class); + /** + * Removes leading and trailing double quote characters from the provided string. + * + *

Oracle table and column names are inherently stored in upper-case; however, if the objects + * are created using double-quotes, the case of the object name is retained. This method will + * adhere to those rules and will always return the name in upper-case unless the provided name + * is double-quoted in which the returned value will have the double-quotes removed and case + * retained. + * + * @param text value to have double quotes removed + * @return string that has had quotes removed + */ + public static String removeQuotes(String text) { + if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) { + return text.substring(1, text.length() - 1); + } + return text.toUpperCase(); + } + public static List listTables( OracleSourceConfig sourceConfig, @Nullable String dbName) { try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) { @@ -127,16 +146,23 @@ public static Schema toSchema(Table table) { .map(OracleSchemaUtils::toColumn) .collect(Collectors.toList()); + List primaryKeys = + table.primaryKeyColumnNames().stream() + .map(OracleSchemaUtils::removeQuotes) + .collect(Collectors.toList()); + return Schema.newBuilder() .setColumns(columns) - .primaryKey(table.primaryKeyColumnNames()) + .primaryKey(primaryKeys) .comment(table.comment()) .build(); } public static Column toColumn(io.debezium.relational.Column column) { return Column.physicalColumn( - column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment()); + removeQuotes(column.name()), + OracleTypeUtils.fromDbzColumn(column), + column.comment()); } public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { @@ -158,7 +184,7 @@ public static Schema getSchema( DataType dataType = OracleTypeUtils.fromDbzColumn(column); org.apache.flink.cdc.common.schema.Column cdcColumn = org.apache.flink.cdc.common.schema.Column.physicalColumn( - column.name(), dataType); + removeQuotes(column.name()), dataType); list.add(cdcColumn); } return Schema.newBuilder().setColumns(list).primaryKey(pks).build(); @@ -181,7 +207,7 @@ public static List getTablePks( while (rs.next()) { String columnName; columnName = rs.getString(1); - list.add(columnName); + list.add(removeQuotes(columnName)); } return list; });