diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
index efd4d304356..f56fd3a2ba0 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.connectors.oracle.source.parser;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
@@ -76,21 +78,6 @@ public String getColumnName(final PlSqlParser.New_column_nameContext ctx) {
* @return parsed table or column name from the supplied name argument
*/
private static String getTableOrColumnName(String name) {
- return removeQuotes(name, true);
- }
-
- /**
- * Removes leading and trailing double quote characters from the provided string.
- *
- * @param text value to have double quotes removed
- * @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted
- * @return string that has had quotes removed
- */
- @SuppressWarnings("SameParameterValue")
- private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) {
- if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
- return text.substring(1, text.length() - 1);
- }
- return upperCaseIfNotQuoted ? text.toUpperCase() : text;
+ return OracleSchemaUtils.removeQuotes(name);
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
index e81c4110cfb..083442b5170 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
@@ -46,6 +46,25 @@ public class OracleSchemaUtils {
private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class);
+ /**
+ * Removes leading and trailing double quote characters from the provided string.
+ *
+ *
Oracle table and column names are inherently stored in upper-case; however, if the objects
+ * are created using double-quotes, the case of the object name is retained. This method will
+ * adhere to those rules and will always return the name in upper-case unless the provided name
+ * is double-quoted in which the returned value will have the double-quotes removed and case
+ * retained.
+ *
+ * @param text value to have double quotes removed
+ * @return string that has had quotes removed
+ */
+ public static String removeQuotes(String text) {
+ if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
+ return text.substring(1, text.length() - 1);
+ }
+ return text.toUpperCase();
+ }
+
public static List listTables(
OracleSourceConfig sourceConfig, @Nullable String dbName) {
try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) {
@@ -127,16 +146,23 @@ public static Schema toSchema(Table table) {
.map(OracleSchemaUtils::toColumn)
.collect(Collectors.toList());
+ List primaryKeys =
+ table.primaryKeyColumnNames().stream()
+ .map(OracleSchemaUtils::removeQuotes)
+ .collect(Collectors.toList());
+
return Schema.newBuilder()
.setColumns(columns)
- .primaryKey(table.primaryKeyColumnNames())
+ .primaryKey(primaryKeys)
.comment(table.comment())
.build();
}
public static Column toColumn(io.debezium.relational.Column column) {
return Column.physicalColumn(
- column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment());
+ removeQuotes(column.name()),
+ OracleTypeUtils.fromDbzColumn(column),
+ column.comment());
}
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
@@ -158,7 +184,7 @@ public static Schema getSchema(
DataType dataType = OracleTypeUtils.fromDbzColumn(column);
org.apache.flink.cdc.common.schema.Column cdcColumn =
org.apache.flink.cdc.common.schema.Column.physicalColumn(
- column.name().toLowerCase(Locale.ROOT), dataType);
+ removeQuotes(column.name()), dataType);
list.add(cdcColumn);
}
return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
@@ -181,7 +207,7 @@ public static List getTablePks(
while (rs.next()) {
String columnName;
columnName = rs.getString(1);
- list.add(columnName.toLowerCase(Locale.ROOT));
+ list.add(removeQuotes(columnName));
}
return list;
});
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
index cd90caad827..6f13b7562ef 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
@@ -981,10 +981,10 @@ public void testGeometryType() throws Exception {
new CreateTableEvent(
TableId.tableId("DEBEZIUM", "MYLAKE"),
Schema.newBuilder()
- .physicalColumn("feature_id", DataTypes.BIGINT().notNull())
- .physicalColumn("name", DataTypes.VARCHAR(32))
- .physicalColumn("shape", DataTypes.STRING())
- .primaryKey(Arrays.asList("feature_id"))
+ .physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull())
+ .physicalColumn("NAME", DataTypes.VARCHAR(32))
+ .physicalColumn("SHAPE", DataTypes.STRING())
+ .primaryKey(Arrays.asList("FEATURE_ID"))
.build());
RowType rowType =
@@ -1568,11 +1568,11 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
Schema.newBuilder()
- .physicalColumn("id", DataTypes.BIGINT().notNull())
- .physicalColumn("name", DataTypes.VARCHAR(255).notNull())
- .physicalColumn("description", DataTypes.VARCHAR(512))
- .physicalColumn("weight", DataTypes.FLOAT())
- .primaryKey(Collections.singletonList("id"))
+ .physicalColumn("ID", DataTypes.BIGINT().notNull())
+ .physicalColumn("NAME", DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512))
+ .physicalColumn("WEIGHT", DataTypes.FLOAT())
+ .primaryKey(Collections.singletonList("ID"))
.build());
}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
index 0be4734e79c..e6b5387a224 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
@@ -150,7 +150,7 @@ void testSyncWholeDatabase() throws Exception {
Statement stat = conn.createStatement()) {
waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}");
+ "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
waitUntilSpecificEvent(
@@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception {
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
+ "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
waitUntilSpecificEvent(
@@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception {
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}");
waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
+ "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
waitUntilSpecificEvent(