Skip to content

Commit ad229f9

Browse files
authored
[FLINK-38964][table] Reuse Calcite's SqlValidatorImpl#maybeCast
1 parent edbe964 commit ad229f9

7 files changed

Lines changed: 21 additions & 61 deletions

File tree

flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,7 @@ private static int calculatePermuteOffset(List<SqlNode> selectItems) {
776776
return 0;
777777
}
778778

779-
private SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) {
779+
protected SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) {
780780
return SqlTypeUtil.equalSansNullability(typeFactory, currentType, desiredType)
781781
? node
782782
: SqlStdOperatorTable.CAST.createCall(

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,11 @@ protected void addToSelectList(
431431
return rewritten;
432432
}
433433

434+
@Override
435+
public SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) {
436+
return super.maybeCast(node, currentType, desiredType);
437+
}
438+
434439
// --------------------------------------------------------------------------------------------
435440
// Column expansion
436441
// --------------------------------------------------------------------------------------------

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.table.api.TableConfig;
2222
import org.apache.flink.table.catalog.CatalogManager;
23+
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
2324
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
2425
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
2526
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
@@ -36,7 +37,6 @@
3637
import org.apache.calcite.sql.SqlNode;
3738
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
3839
import org.apache.calcite.sql.parser.SqlParser;
39-
import org.apache.calcite.sql.validate.SqlValidator;
4040

4141
import javax.annotation.Nullable;
4242

@@ -63,7 +63,7 @@ public TableConfig getTableConfig() {
6363
}
6464

6565
@Override
66-
public SqlValidator getSqlValidator() {
66+
public FlinkCalciteSqlValidator getSqlValidator() {
6767
return flinkPlanner.getOrCreateSqlValidator();
6868
}
6969

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@
6767

6868
/** A utility class with logic for handling the {@code CREATE TABLE ... AS SELECT} clause. */
6969
public class MergeTableAsUtil {
70-
private final SqlValidator validator;
70+
private final FlinkCalciteSqlValidator validator;
7171
private final Function<SqlNode, String> escapeExpression;
7272
private final DataTypeFactory dataTypeFactory;
7373

7474
public MergeTableAsUtil(
75-
SqlValidator validator,
75+
FlinkCalciteSqlValidator validator,
7676
Function<SqlNode, String> escapeExpression,
7777
DataTypeFactory dataTypeFactory) {
7878
this.validator = validator;
@@ -135,11 +135,10 @@ public PlannerQueryOperation maybeRewriteQuery(
135135

136136
assignedFields.put(
137137
pos,
138-
rewriterUtils.maybeCast(
138+
validator.maybeCast(
139139
SqlLiteral.createNull(SqlParserPos.ZERO),
140140
typeFactory.createUnknownType(),
141-
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
142-
typeFactory));
141+
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
143142
} else {
144143
targetPositions.add(sourceFields.get(targetField.getName()));
145144
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.table.api.TableEnvironment;
2323
import org.apache.flink.table.catalog.CatalogManager;
2424
import org.apache.flink.table.operations.Operation;
25+
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
2526
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
2627
import org.apache.flink.table.planner.utils.Expander;
2728
import org.apache.flink.table.types.DataType;
@@ -79,7 +80,7 @@ interface ConvertContext {
7980
TableConfig getTableConfig();
8081

8182
/** Returns the {@link SqlValidator} in the convert context. */
82-
SqlValidator getSqlValidator();
83+
FlinkCalciteSqlValidator getSqlValidator();
8384

8485
/** Returns the {@link CatalogManager} in the convert context. */
8586
CatalogManager getCatalogManager();

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,24 @@
1717
*/
1818
package org.apache.flink.table.planner.calcite
1919

20-
import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
2120
import org.apache.flink.sql.parser.SqlProperty
2221
import org.apache.flink.sql.parser.dml.RichSqlInsert
2322
import org.apache.flink.sql.parser.dql.SqlRichExplain
2423
import org.apache.flink.table.api.ValidationException
2524
import org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects, notSupported}
26-
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
2725
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, FlinkPreparingTableBase, LegacyCatalogSourceTable}
28-
import org.apache.flink.util.Preconditions.checkArgument
2926

3027
import org.apache.calcite.plan.RelOptTable
3128
import org.apache.calcite.prepare.CalciteCatalogReader
3229
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
3330
import org.apache.calcite.runtime.{CalciteContextException, Resources}
34-
import org.apache.calcite.sql.`type`.SqlTypeUtil
35-
import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, SqlUtil}
36-
import org.apache.calcite.sql.fun.SqlStdOperatorTable
31+
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, SqlNodeList, SqlTableRef, SqlUtil}
3732
import org.apache.calcite.sql.parser.SqlParserPos
3833
import org.apache.calcite.sql.util.SqlBasicVisitor
3934
import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil}
4035
import org.apache.calcite.util.Static.RESOURCE
4136

4237
import java.util
43-
import java.util.Collections
4438

4539
import scala.collection.JavaConversions._
4640

@@ -153,11 +147,7 @@ object PreValidateReWriter {
153147
val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
154148
assignedFields.put(
155149
targetField.getIndex,
156-
rewriterUtils.maybeCast(
157-
value,
158-
value.createSqlType(typeFactory),
159-
targetField.getType,
160-
typeFactory))
150+
validator.maybeCast(value, value.createSqlType(typeFactory), targetField.getType))
161151
}
162152

163153
// validate partial insert columns.
@@ -205,11 +195,11 @@ object PreValidateReWriter {
205195
validateField(idx => !assignedFields.contains(idx), id, targetField)
206196
assignedFields.put(
207197
targetField.getIndex,
208-
rewriterUtils.maybeCast(
198+
validator.maybeCast(
209199
SqlLiteral.createNull(SqlParserPos.ZERO),
210200
typeFactory.createUnknownType(),
211-
targetField.getType,
212-
typeFactory)
201+
targetField.getType
202+
)
213203
)
214204
} else {
215205
// handle reorder

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,15 @@
1717
*/
1818
package org.apache.flink.table.planner.calcite
1919

20-
import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
2120
import org.apache.flink.table.api.ValidationException
2221
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.ExplicitTableSqlSelect
2322
import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, rewriteSqlSelect, rewriteSqlValues, rewriteSqlWith}
2423
import org.apache.flink.util.Preconditions.checkArgument
2524

26-
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
25+
import org.apache.calcite.rel.`type`.RelDataType
2726
import org.apache.calcite.runtime.{CalciteContextException, Resources}
28-
import org.apache.calcite.sql.`type`.SqlTypeUtil
29-
import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith}
27+
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith}
3028
import org.apache.calcite.sql.fun.SqlStdOperatorTable
31-
import org.apache.calcite.sql.parser.SqlParserPos
3229
import org.apache.calcite.sql.validate.SqlValidatorException
3330
import org.apache.calcite.util.Static.RESOURCE
3431

@@ -79,38 +76,6 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
7976
targetPosition,
8077
unsupportedErrorMessage)
8178
}
82-
83-
// This code snippet is copied from the SqlValidatorImpl.
84-
def maybeCast(
85-
node: SqlNode,
86-
currentType: RelDataType,
87-
desiredType: RelDataType,
88-
typeFactory: RelDataTypeFactory): SqlNode = {
89-
if (
90-
currentType == desiredType
91-
|| (currentType.isNullable != desiredType.isNullable
92-
&& typeFactory.createTypeWithNullability(currentType, desiredType.isNullable)
93-
== desiredType)
94-
) {
95-
node
96-
} else {
97-
// See FLINK-26460 for more details
98-
val sqlDataTypeSpec =
99-
if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType)) {
100-
val keyType = desiredType.getKeyType
101-
val valueType = desiredType.getValueType
102-
new SqlDataTypeSpec(
103-
new SqlMapTypeNameSpec(
104-
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
105-
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
106-
SqlParserPos.ZERO),
107-
SqlParserPos.ZERO)
108-
} else {
109-
SqlTypeUtil.convertTypeToSpec(desiredType)
110-
}
111-
SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, sqlDataTypeSpec)
112-
}
113-
}
11479
}
11580

11681
object SqlRewriterUtils {

0 commit comments

Comments
 (0)