diff --git a/datafusion/sqllogictest/test_files/aggregates_simplify.slt b/datafusion/sqllogictest/test_files/aggregates_simplify.slt new file mode 100644 index 000000000000..cc2e40540b98 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregates_simplify.slt @@ -0,0 +1,344 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####### +# Tests for aggregate optimizations / simplifications +####### + +statement ok +CREATE TABLE sum_simplify_t AS VALUES (1, 100), (1, 200), (2, 100), (NULL, NULL); + +# Baseline SUM of an expression +query I +SELECT SUM(column1 + 1) FROM sum_simplify_t; +---- +7 + +query TT +EXPLAIN SELECT SUM(column1 + 1) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1))]] +02)--TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1))] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + + +# Mixed aggregate expressions with type validation +query TI +SELECT arrow_typeof(SUM(column1)), SUM(column1 + 1) FROM sum_simplify_t; +---- +Int64 7 + +query TT +EXPLAIN SELECT arrow_typeof(SUM(column1)), SUM(column1), SUM(column1 + 1) FROM sum_simplify_t; +---- +logical_plan +01)Projection: arrow_typeof(sum(sum_simplify_t.column1)), sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1)) +02)--Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))]] +03)----TableScan: sum_simplify_t projection=[column1] +physical_plan +01)ProjectionExec: expr=[arrow_typeof(sum(sum_simplify_t.column1)@0) as arrow_typeof(sum(sum_simplify_t.column1)), sum(sum_simplify_t.column1)@0 as sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))@1 as sum(sum_simplify_t.column1 + Int64(1))] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Duplicate aggregate expressions +query II +SELECT SUM(column1 + 1) AS sum_plus_1_a, SUM(column1 + 1) AS sum_plus_1_b FROM sum_simplify_t; +---- +7 7 + +query TT +EXPLAIN SELECT SUM(column1 + 1) AS sum_plus_1_a, SUM(column1 + 1) AS sum_plus_1_b FROM sum_simplify_t; +---- +logical_plan +01)Projection: sum(sum_simplify_t.column1 + Int64(1)) AS sum_plus_1_a, sum(sum_simplify_t.column1 + Int64(1)) AS sum_plus_1_b +02)--Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1))]] +03)----TableScan: sum_simplify_t projection=[column1] +physical_plan +01)ProjectionExec: expr=[sum(sum_simplify_t.column1 + Int64(1))@0 as sum_plus_1_a, sum(sum_simplify_t.column1 + Int64(1))@0 as sum_plus_1_b] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1))] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + + +# constant aggregate expressions +query II +SELECT SUM(2+1), SUM(3) FROM sum_simplify_t; +---- +12 12 + +query TT +EXPLAIN SELECT SUM(2+1), SUM(3) FROM sum_simplify_t; +---- +logical_plan +01)Projection: __common_expr_1 AS sum(Int64(2) + Int64(1)), __common_expr_1 AS sum(Int64(3)) +02)--Aggregate: groupBy=[[]], aggr=[[sum(Int64(3)) AS __common_expr_1]] +03)----TableScan: sum_simplify_t projection=[] +physical_plan +01)ProjectionExec: expr=[__common_expr_1@0 as sum(Int64(2) + Int64(1)), __common_expr_1@0 as sum(Int64(3))] +02)--AggregateExec: mode=Single, gby=[], aggr=[__common_expr_1] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + + +# Duplicated expression across multiple aggregate arguments. +query II +SELECT SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t; +---- +7 10 + + +query TT +EXPLAIN SELECT SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]] +02)--TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Reordered expressions that still compute the same thing +query II +SELECT SUM(1 + column1), SUM(column1 + 2) FROM sum_simplify_t; +---- +7 10 + +query TT +EXPLAIN SELECT SUM(1 + column1), SUM(column1 + 2) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(Int64(1) + sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(2))]] +02)--TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(Int64(1) + sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(2))] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# DISTINCT aggregates with different arguments +query II +SELECT SUM(DISTINCT column1 + 1), SUM(DISTINCT column1 + 2) FROM sum_simplify_t; +---- +5 7 + +query TT +EXPLAIN SELECT SUM(DISTINCT column1 + 1), SUM(DISTINCT column1 + 2) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(DISTINCT sum_simplify_t.column1 + Int64(2))]] +02)--TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(DISTINCT sum_simplify_t.column1 + Int64(2))] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# DISTINCT and non-DISTINCT aggregates +query II +SELECT SUM(DISTINCT column1 + 1), SUM(column1 + 1) FROM sum_simplify_t; +---- +5 7 + +query TT +EXPLAIN SELECT SUM(DISTINCT column1 + 1), SUM(column1 + 1) FROM sum_simplify_t; +---- +logical_plan +01)Projection: sum(alias1) AS sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(alias2) AS sum(sum_simplify_t.column1 + Int64(1)) +02)--Aggregate: groupBy=[[]], aggr=[[sum(alias1), sum(alias2)]] +03)----Aggregate: groupBy=[[__common_expr_1 AS alias1]], aggr=[[sum(__common_expr_1) AS alias2]] +04)------Projection: sum_simplify_t.column1 + Int64(1) AS __common_expr_1 +05)--------TableScan: sum_simplify_t projection=[column1] +physical_plan +01)ProjectionExec: expr=[sum(alias1)@0 as sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(alias2)@1 as sum(sum_simplify_t.column1 + Int64(1))] +02)--AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)] +05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2] +06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1 +07)------------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2] +08)--------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1] +09)----------------DataSourceExec: partitions=1, partition_sizes=[1] + +# FILTER clauses with different aggregate arguments +query II +SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 2) FILTER (WHERE column1 > 2) FROM sum_simplify_t; +---- +3 NULL + +query TT +EXPLAIN SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 2) FILTER (WHERE column1 > 2) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(sum_simplify_t.column1 + Int64(2)) FILTER (WHERE sum_simplify_t.column1 > Int64(2))]] +02)--TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(sum_simplify_t.column1 + Int64(2)) FILTER (WHERE sum_simplify_t.column1 > Int64(2))] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# FILTER clauses with the same aggregate argument +query II +SELECT + SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_a, + SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_b +FROM sum_simplify_t; +---- +3 3 + +query TT +EXPLAIN SELECT + SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_a, + SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_b +FROM sum_simplify_t; +---- +logical_plan +01)Projection: sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)) AS filtered_sum_a, sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)) AS filtered_sum_b +02)--Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))]] +03)----TableScan: sum_simplify_t projection=[column1] +physical_plan +01)ProjectionExec: expr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))@0 as filtered_sum_a, sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))@0 as filtered_sum_b] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Same aggregate argument with different FILTER predicates +query II +SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 1) FILTER (WHERE column1 > 0) FROM sum_simplify_t; +---- +3 7 + +query TT +EXPLAIN SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 1) FILTER (WHERE column1 > 0) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(__common_expr_1 AS sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(0))]] +02)--Projection: sum_simplify_t.column1 + Int64(1) AS __common_expr_1, sum_simplify_t.column1 +03)----TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(0))] +02)--ProjectionExec: expr=[column1@0 + 1 as __common_expr_1, column1@0 as column1] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# volatile aggregate arguments +query B +SELECT SUM(random() + 1) < SUM(random() + 2) FROM sum_simplify_t; +---- +true + +query TT +EXPLAIN SELECT SUM(random() + 1) < SUM(random() + 2) FROM sum_simplify_t; +---- +logical_plan +01)Projection: sum(random() + Int64(2)) > sum(random() + Int64(1)) AS sum(random() + Int64(1)) < sum(random() + Int64(2)) +02)--Aggregate: groupBy=[[]], aggr=[[sum(random() + Float64(1)) AS sum(random() + Int64(1)), sum(random() + Float64(2)) AS sum(random() + Int64(2))]] +03)----TableScan: sum_simplify_t projection=[] +physical_plan +01)ProjectionExec: expr=[sum(random() + Int64(2))@1 > sum(random() + Int64(1))@0 as sum(random() + Int64(1)) < sum(random() + Int64(2))] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(random() + Int64(1)), sum(random() + Int64(2))] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Checks grouped aggregates with explicit ORDER BY return deterministic row order. +query III +SELECT column2, SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t GROUP BY column2 ORDER BY column2 DESC NULLS LAST; +---- +200 2 3 +100 5 7 +NULL NULL NULL + +query TT +EXPLAIN SELECT column2, SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t GROUP BY column2 ORDER BY column2 DESC NULLS LAST; +---- +logical_plan +01)Sort: sum_simplify_t.column2 DESC NULLS LAST +02)--Aggregate: groupBy=[[sum_simplify_t.column2]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]] +03)----TableScan: sum_simplify_t projection=[column1, column2] +physical_plan +01)SortPreservingMergeExec: [column2@0 DESC NULLS LAST] +02)--SortExec: expr=[column2@0 DESC NULLS LAST], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[column2@0 as column2], aggr=[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))] +04)------RepartitionExec: partitioning=Hash([column2@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[column2@1 as column2], aggr=[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +# Checks commutative forms of equivalent aggregate arguments are simplified consistently. +query II +SELECT SUM(1 + column1), SUM(column1 + 1) FROM sum_simplify_t; +---- +7 7 + +query TT +EXPLAIN SELECT SUM(1 + column1), SUM(column1 + 1) FROM sum_simplify_t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS Int64(1) + sum_simplify_t.column1), sum(__common_expr_1 AS sum_simplify_t.column1 + Int64(1))]] +02)--Projection: Int64(1) + sum_simplify_t.column1 AS __common_expr_1 +03)----TableScan: sum_simplify_t projection=[column1] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[sum(Int64(1) + sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))] +02)--ProjectionExec: expr=[1 + column1@0 as __common_expr_1] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Checks unsigned overflow edge case from PR discussion using transformed SUM arguments. +statement ok +CREATE TABLE IF NOT EXISTS tbl (val INTEGER UNSIGNED); + +statement ok +INSERT INTO tbl VALUES (4294967295); + +statement ok +INSERT INTO tbl VALUES (4294967295); + +# Checks transformed SUM results for unsigned max values are preserved. +query TII +SELECT arrow_typeof(SUM(val + 1)), SUM(val + 1), SUM(val + 2) FROM tbl; +---- +Int64 8589934592 8589934594 + +query TT +EXPLAIN SELECT arrow_typeof(SUM(val + 1)), SUM(val + 1), SUM(val + 2) FROM tbl; +---- +logical_plan +01)Projection: arrow_typeof(sum(tbl.val + Int64(1))), sum(tbl.val + Int64(1)), sum(tbl.val + Int64(2)) +02)--Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS tbl.val + Int64(1)), sum(__common_expr_1 AS tbl.val + Int64(2))]] +03)----Projection: CAST(tbl.val AS Int64) AS __common_expr_1 +04)------TableScan: tbl projection=[val] +physical_plan +01)ProjectionExec: expr=[arrow_typeof(sum(tbl.val + Int64(1))@0) as arrow_typeof(sum(tbl.val + Int64(1))), sum(tbl.val + Int64(1))@0 as sum(tbl.val + Int64(1)), sum(tbl.val + Int64(2))@1 as sum(tbl.val + Int64(2))] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(tbl.val + Int64(1)), sum(tbl.val + Int64(2))] +03)----ProjectionExec: expr=[CAST(val@0 AS Int64) as __common_expr_1] +04)------DataSourceExec: partitions=1, partition_sizes=[2] + +# Checks equivalent rewritten form (SUM + COUNT terms) matches transformed SUM semantics. +query RR +SELECT SUM(val) + 1 * COUNT(val), SUM(val) + 2 * COUNT(val) FROM tbl; +---- +8589934592 8589934594 + +query TT +EXPLAIN SELECT SUM(val) + 1 * COUNT(val), SUM(val) + 2 * COUNT(val) FROM tbl; +---- +logical_plan +01)Projection: __common_expr_1 + CAST(count(tbl.val) AS Decimal128(20, 0)) AS sum(tbl.val) + Int64(1) * count(tbl.val), __common_expr_1 AS sum(tbl.val) + CAST(Int64(2) * count(tbl.val) AS Decimal128(20, 0)) +02)--Projection: CAST(sum(tbl.val) AS Decimal128(20, 0)) AS __common_expr_1, count(tbl.val) +03)----Aggregate: groupBy=[[]], aggr=[[sum(CAST(tbl.val AS UInt64)), count(tbl.val)]] +04)------TableScan: tbl projection=[val] +physical_plan +01)ProjectionExec: expr=[__common_expr_1@0 + CAST(count(tbl.val)@1 AS Decimal128(20, 0)) as sum(tbl.val) + Int64(1) * count(tbl.val), __common_expr_1@0 + CAST(2 * count(tbl.val)@1 AS Decimal128(20, 0)) as sum(tbl.val) + Int64(2) * count(tbl.val)] +02)--ProjectionExec: expr=[CAST(sum(tbl.val)@0 AS Decimal128(20, 0)) as __common_expr_1, count(tbl.val)@1 as count(tbl.val)] +03)----AggregateExec: mode=Single, gby=[], aggr=[sum(tbl.val), count(tbl.val)] +04)------DataSourceExec: partitions=1, partition_sizes=[2] + +statement ok +DROP TABLE IF EXISTS tbl; + +statement ok +DROP TABLE sum_simplify_t;