diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 1b73936914928..d92ed3cfbf2a0 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -184,6 +184,7 @@ "org.apache.calcite.sql.SqlCreate" "org.apache.calcite.sql.SqlDrop" "org.apache.calcite.sql.SqlExplicitModelOperator" + "org.apache.calcite.sql.SqlGroupByAllOperator" "org.apache.calcite.sql.SqlIntervalLiteral" "org.apache.calcite.sql.SqlLambda" "org.apache.calcite.runtime.Resources" diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj index 1f0ae3d455cd8..bc24867f51783 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj +++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj @@ -2596,25 +2596,45 @@ SqlNode Where() : SqlNodeList GroupBy() : { final List list; - final boolean distinct; final Span s; + SqlParserPos pos; + List res; } { { s = span(); } ( - { distinct = true; } - | { distinct = false; } - | { distinct = false; } + + list = GroupingElementList() { + pos = s.end(this); + res = + ImmutableList.of( + SqlInternalOperators.GROUP_BY_DISTINCT.createCall(pos, list) + ); + return new SqlNodeList(res, pos); + } + | + + ( + LOOKAHEAD( | | ) + list = GroupingElementList() { + return new SqlNodeList(list, s.end(this)); + } + | + { + pos = s.end(this); + res = + ImmutableList.of( + SqlGroupByAllOperator.INSTANCE.createCall(pos) + ); + return new SqlNodeList(res, pos); + } + ) + | + list = GroupingElementList() { + return new SqlNodeList(list, s.end(this)); + } ) - list = GroupingElementList() { - final SqlParserPos pos = s.end(this); - final List list2 = distinct - ? ImmutableList.of( - SqlInternalOperators.GROUP_BY_DISTINCT.createCall(pos, list)) - : list; - return new SqlNodeList(list2, pos); - } } List GroupingElementList() : diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlGroupByAllOperator.java b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlGroupByAllOperator.java new file mode 100644 index 0000000000000..2c6b733b1f218 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlGroupByAllOperator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.calcite.sql; + +/** + * Marker operator for a bare {@code GROUP BY ALL} clause. + * + *

The parser emits a call to this operator as a placeholder, because at parse time it cannot + * know the table's columns or which SELECT expressions are aggregates. {@code FlinkCalciteSqlValidator} + * rewrites the placeholder into the actual grouping expressions during validation, so this operator + * never reaches type derivation or conversion. + */ + +public class SqlGroupByAllOperator extends SqlSpecialOperator { + public static final SqlGroupByAllOperator INSTANCE = new SqlGroupByAllOperator(); + + private SqlGroupByAllOperator() { + super("GROUP BY ALL", SqlKind.OTHER); + } + + @Override + public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + writer.keyword("ALL"); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 1d1e7d639fa00..a95760c9d5b7d 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -132,6 +132,18 @@ void testArrayAgg() { + "GROUP BY `GENDER`"); } + @Test + void testGroupByAll() { + sql("select\n" + + " a, count(*)\n" + + "from t group by all") + .ok( + "SELECT `A`, COUNT(*)\n" + + "FROM `T`\n" + + "GROUP BY (ALL)"); + } + + @Test void testCastAsMapType() { this.expr("cast(a as map)").ok("CAST(`A` AS MAP< INTEGER, INTEGER >)"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java index 3c59b56162f0d..29b857fcdebc8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java @@ -169,7 +169,17 @@ private TableConfigOptions() {} + "and cryptic error message when working on nested data. " + "For example, it prevented using rows in computed columns or join keys. " + "The new behavior takes the nullability into consideration."); - + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption TABLE_GROUP_BY_ALL_ENABLED = + key("table.group-by-all-enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enables the 'GROUP BY ALL' clause, a shorthand that groups by every" + + " non-aggregated expression in the SELECT list. Disabled by" + + " default during the initial rollout; will be enabled by default" + + " in a future release." + ); // ------------------------------------------------------------------------------------------ // Options for plan handling // ------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index 77a1634fa0bc2..9fe74dcaeb89a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -44,12 +44,14 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.SchemaVersion; import org.apache.calcite.sql.JoinType; +import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlAsOperator; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlExplicitModelCall; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlGroupByAllOperator; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; @@ -68,6 +70,7 @@ import org.apache.calcite.sql.type.SqlOperandMetadata; import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.util.SqlBasicVisitor; import org.apache.calcite.sql.validate.DelegatingScope; import org.apache.calcite.sql.validate.IdentifierNamespace; import org.apache.calcite.sql.validate.IdentifierSnapshotNamespace; @@ -197,6 +200,71 @@ protected void validateJoin(SqlJoin join, SqlValidatorScope scope) { super.validateJoin(join, scope); } + @Override + protected void validateGroupClause(SqlSelect select) { + rewriteGroupByAll(select); + super.validateGroupClause(select); + } + + private void rewriteGroupByAll(SqlSelect select) { + final SqlNodeList group = select.getGroup(); + if (!isGroupByAll(group)) { + return; + } + + final boolean enabled = + ShortcutUtils.unwrapTableConfig(relOptCluster) + .get(TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED); + if (!enabled) { + throw new ValidationException( + "GROUP BY ALL is not enabled. Set '" + + TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED.key() + + "' to true to enable it."); + } + + final List keys = new ArrayList<>(); + for (SqlNode selectItem : select.getSelectList()) { + final SqlNode expr = SqlUtil.stripAs(selectItem); + if (expr instanceof SqlIdentifier && ((SqlIdentifier) expr).isStar()) { + throw new ValidationException( + "GROUP BY ALL does not support '*' in the SELECT list; " + + "please list the grouping columns explicitly."); + } + if (!containsAggregateOrOver(expr)) { + keys.add(expr); + } + } + select.setGroupBy(new SqlNodeList(keys, group.getParserPosition())); + } + + private static boolean isGroupByAll(SqlNodeList group) { + if (group == null || group.size() != 1) { + return false; + } + + final SqlNode item = group.get(0); + return item instanceof SqlCall + && ((SqlCall) item).getOperator() instanceof SqlGroupByAllOperator; + } + + private static boolean containsAggregateOrOver(SqlNode node) { + final boolean[] found = {false}; + node.accept( + new SqlBasicVisitor() { + @Override + public Void visit(SqlCall call) { + if (call.getOperator() instanceof SqlAggFunction + || call.getKind() == SqlKind.OVER) { + found[0] = true; + return null; + } + return super.visit(call); + } + } + ); + return found[0]; + } + @Override protected void registerNamespace( @Nullable SqlValidatorScope usingScope, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/GroupByAllTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/GroupByAllTest.java new file mode 100644 index 0000000000000..379be49dc3475 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/GroupByAllTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * imitations under the License. + */ + +package org.apache.flink.table.planner.plan.batch.sql; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/* Tests for the {@code GROUP BY ALL} clause. */ +class GroupByAllTest { + + // TODO: test non aggregated only columns + + @Test + void testGroupByAllByNonAggregateColumns() { + final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + tEnv.getConfig().set(TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED, true); + + final List actual = + CollectionUtil.iteratorToList( + tEnv.executeSql( + "SELECT city, COUNT(*) AS cnt " + + "FROM (VALUES ('Beijing'), ('Shanghai'), ('Beijing')) AS t(city) " + + "GROUP BY ALL") + .collect() + ); + assertThat(actual).containsExactlyInAnyOrder(Row.of("Beijing", 2L), Row.of("Shanghai", 1L)); + } + + @Test + void testGroupByAllDisabledByDefault() { + final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + + assertThatThrownBy( + () -> tEnv.executeSql( + "SELECT city, COUNT(*) AS cnt " + + "FROM (VALUES ('Beijing'), ('Shanghai'), ('Beijing')) AS t(city) " + + "GROUP BY ALL") + .collect()) + .hasMessageContaining("GROUP BY ALL is not enabled"); + } + + @Test + void testGroupByAllWithOnlyAggregates() { + final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + tEnv.getConfig().set(TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED, true); + + final List actual = + CollectionUtil.iteratorToList( + tEnv.executeSql( + "SELECT COUNT(*) AS cnt " + + "FROM (VALUES ('Beijing'), ('Shanghai'), ('Beijing')) AS t(city) " + + "GROUP BY ALL") + .collect()); + assertThat(actual).containsExactly(Row.of(3L)); + } + + @Test + void testGroupByAllGroupsByWholeExpression() { + final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + tEnv.getConfig().set(TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED, true); + + final List actual = + CollectionUtil.iteratorToList( + tEnv.executeSql( + "SELECT a + b AS s, COUNT(*) AS cnt " + + "FROM (VALUES (1, 2), (3, 4), (1, 2)) AS t(a, b) " + + "GROUP BY ALL") + .collect()); + assertThat(actual) + .containsExactlyInAnyOrder( + Row.of(3,2L), Row.of(7,1L)); + } + + @Test + void testGroupByAllExcludesWindowFunctions() { + final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + tEnv.getConfig().set(TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED, true); + + final List actual = + CollectionUtil.iteratorToList( + tEnv.executeSql( + "SELECT city, ROW_NUMBER() OVER (ORDER BY city) AS rn " + + "FROM (VALUES ('Beijing'), ('Shanghai'), ('Beijing')) AS t(city) " + + "GROUP BY ALL") + .collect()); + assertThat(actual).containsExactlyInAnyOrder(Row.of("Beijing", 1L), Row.of("Shanghai", 2L)); + } + + @Test + void testGroupByAllErrorsOnBareColumnInsideAggregateExpression() { + final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + tEnv.getConfig().set(TableConfigOptions.TABLE_GROUP_BY_ALL_ENABLED, true); + + assertThatThrownBy( + () -> tEnv.executeSql( + "SELECT a + COUNT(*) " + + "FROM (VALUES (1), (2)) AS t(a) " + + "GROUP BY ALL") + .collect()) + .hasMessageContaining("not being grouped"); + } +}