Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 344 additions & 0 deletions datafusion/sqllogictest/test_files/aggregates_simplify.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#######
# Tests for aggregate optimizations / simplifications
#######

statement ok
CREATE TABLE sum_simplify_t AS VALUES (1, 100), (1, 200), (2, 100), (NULL, NULL);

# Baseline SUM of an expression
query I
SELECT SUM(column1 + 1) FROM sum_simplify_t;
----
7

query TT
EXPLAIN SELECT SUM(column1 + 1) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1))]]
02)--TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1))]
02)--DataSourceExec: partitions=1, partition_sizes=[1]


# Mixed aggregate expressions with type validation
query TI
SELECT arrow_typeof(SUM(column1)), SUM(column1 + 1) FROM sum_simplify_t;
----
Int64 7

query TT
EXPLAIN SELECT arrow_typeof(SUM(column1)), SUM(column1), SUM(column1 + 1) FROM sum_simplify_t;
----
logical_plan
01)Projection: arrow_typeof(sum(sum_simplify_t.column1)), sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))
02)--Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))]]
03)----TableScan: sum_simplify_t projection=[column1]
physical_plan
01)ProjectionExec: expr=[arrow_typeof(sum(sum_simplify_t.column1)@0) as arrow_typeof(sum(sum_simplify_t.column1)), sum(sum_simplify_t.column1)@0 as sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))@1 as sum(sum_simplify_t.column1 + Int64(1))]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# Duplicate aggregate expressions
query II
SELECT SUM(column1 + 1) AS sum_plus_1_a, SUM(column1 + 1) AS sum_plus_1_b FROM sum_simplify_t;
----
7 7

query TT
EXPLAIN SELECT SUM(column1 + 1) AS sum_plus_1_a, SUM(column1 + 1) AS sum_plus_1_b FROM sum_simplify_t;
----
logical_plan
01)Projection: sum(sum_simplify_t.column1 + Int64(1)) AS sum_plus_1_a, sum(sum_simplify_t.column1 + Int64(1)) AS sum_plus_1_b
02)--Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1))]]
03)----TableScan: sum_simplify_t projection=[column1]
physical_plan
01)ProjectionExec: expr=[sum(sum_simplify_t.column1 + Int64(1))@0 as sum_plus_1_a, sum(sum_simplify_t.column1 + Int64(1))@0 as sum_plus_1_b]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1))]
03)----DataSourceExec: partitions=1, partition_sizes=[1]


# constant aggregate expressions
query II
SELECT SUM(2+1), SUM(3) FROM sum_simplify_t;
----
12 12

query TT
EXPLAIN SELECT SUM(2+1), SUM(3) FROM sum_simplify_t;
----
logical_plan
01)Projection: __common_expr_1 AS sum(Int64(2) + Int64(1)), __common_expr_1 AS sum(Int64(3))
02)--Aggregate: groupBy=[[]], aggr=[[sum(Int64(3)) AS __common_expr_1]]
03)----TableScan: sum_simplify_t projection=[]
physical_plan
01)ProjectionExec: expr=[__common_expr_1@0 as sum(Int64(2) + Int64(1)), __common_expr_1@0 as sum(Int64(3))]
02)--AggregateExec: mode=Single, gby=[], aggr=[__common_expr_1]
03)----DataSourceExec: partitions=1, partition_sizes=[1]


# Duplicated expression across multiple aggregate arguments.
query II
SELECT SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t;
----
7 10


query TT
EXPLAIN SELECT SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]]
02)--TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# Reordered expressions that still compute the same thing
query II
SELECT SUM(1 + column1), SUM(column1 + 2) FROM sum_simplify_t;
----
7 10

query TT
EXPLAIN SELECT SUM(1 + column1), SUM(column1 + 2) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(Int64(1) + sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(2))]]
02)--TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(Int64(1) + sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(2))]
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# DISTINCT aggregates with different arguments
query II
SELECT SUM(DISTINCT column1 + 1), SUM(DISTINCT column1 + 2) FROM sum_simplify_t;
----
5 7

query TT
EXPLAIN SELECT SUM(DISTINCT column1 + 1), SUM(DISTINCT column1 + 2) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(DISTINCT sum_simplify_t.column1 + Int64(2))]]
02)--TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(DISTINCT sum_simplify_t.column1 + Int64(2))]
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# DISTINCT and non-DISTINCT aggregates
query II
SELECT SUM(DISTINCT column1 + 1), SUM(column1 + 1) FROM sum_simplify_t;
----
5 7

query TT
EXPLAIN SELECT SUM(DISTINCT column1 + 1), SUM(column1 + 1) FROM sum_simplify_t;
----
logical_plan
01)Projection: sum(alias1) AS sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(alias2) AS sum(sum_simplify_t.column1 + Int64(1))
02)--Aggregate: groupBy=[[]], aggr=[[sum(alias1), sum(alias2)]]
03)----Aggregate: groupBy=[[__common_expr_1 AS alias1]], aggr=[[sum(__common_expr_1) AS alias2]]
04)------Projection: sum_simplify_t.column1 + Int64(1) AS __common_expr_1
05)--------TableScan: sum_simplify_t projection=[column1]
physical_plan
01)ProjectionExec: expr=[sum(alias1)@0 as sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(alias2)@1 as sum(sum_simplify_t.column1 + Int64(1))]
02)--AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2]
06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
07)------------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2]
08)--------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1]
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]

# FILTER clauses with different aggregate arguments
query II
SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 2) FILTER (WHERE column1 > 2) FROM sum_simplify_t;
----
3 NULL

query TT
EXPLAIN SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 2) FILTER (WHERE column1 > 2) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(sum_simplify_t.column1 + Int64(2)) FILTER (WHERE sum_simplify_t.column1 > Int64(2))]]
02)--TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(sum_simplify_t.column1 + Int64(2)) FILTER (WHERE sum_simplify_t.column1 > Int64(2))]
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# FILTER clauses with the same aggregate argument
query II
SELECT
SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_a,
SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_b
FROM sum_simplify_t;
----
3 3

query TT
EXPLAIN SELECT
SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_a,
SUM(column1 + 1) FILTER (WHERE column1 > 1) AS filtered_sum_b
FROM sum_simplify_t;
----
logical_plan
01)Projection: sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)) AS filtered_sum_a, sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)) AS filtered_sum_b
02)--Aggregate: groupBy=[[]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))]]
03)----TableScan: sum_simplify_t projection=[column1]
physical_plan
01)ProjectionExec: expr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))@0 as filtered_sum_a, sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))@0 as filtered_sum_b]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1))]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# Same aggregate argument with different FILTER predicates
query II
SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 1) FILTER (WHERE column1 > 0) FROM sum_simplify_t;
----
3 7

query TT
EXPLAIN SELECT SUM(column1 + 1) FILTER (WHERE column1 > 1), SUM(column1 + 1) FILTER (WHERE column1 > 0) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(__common_expr_1 AS sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(0))]]
02)--Projection: sum_simplify_t.column1 + Int64(1) AS __common_expr_1, sum_simplify_t.column1
03)----TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(1)), sum(sum_simplify_t.column1 + Int64(1)) FILTER (WHERE sum_simplify_t.column1 > Int64(0))]
02)--ProjectionExec: expr=[column1@0 + 1 as __common_expr_1, column1@0 as column1]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# volatile aggregate arguments
query B
SELECT SUM(random() + 1) < SUM(random() + 2) FROM sum_simplify_t;
----
true

query TT
EXPLAIN SELECT SUM(random() + 1) < SUM(random() + 2) FROM sum_simplify_t;
----
logical_plan
01)Projection: sum(random() + Int64(2)) > sum(random() + Int64(1)) AS sum(random() + Int64(1)) < sum(random() + Int64(2))
02)--Aggregate: groupBy=[[]], aggr=[[sum(random() + Float64(1)) AS sum(random() + Int64(1)), sum(random() + Float64(2)) AS sum(random() + Int64(2))]]
03)----TableScan: sum_simplify_t projection=[]
physical_plan
01)ProjectionExec: expr=[sum(random() + Int64(2))@1 > sum(random() + Int64(1))@0 as sum(random() + Int64(1)) < sum(random() + Int64(2))]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(random() + Int64(1)), sum(random() + Int64(2))]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# Checks grouped aggregates with explicit ORDER BY return deterministic row order.
query III
SELECT column2, SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t GROUP BY column2 ORDER BY column2 DESC NULLS LAST;
----
200 2 3
100 5 7
NULL NULL NULL

query TT
EXPLAIN SELECT column2, SUM(column1 + 1), SUM(column1 + 2) FROM sum_simplify_t GROUP BY column2 ORDER BY column2 DESC NULLS LAST;
----
logical_plan
01)Sort: sum_simplify_t.column2 DESC NULLS LAST
02)--Aggregate: groupBy=[[sum_simplify_t.column2]], aggr=[[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]]
03)----TableScan: sum_simplify_t projection=[column1, column2]
physical_plan
01)SortPreservingMergeExec: [column2@0 DESC NULLS LAST]
02)--SortExec: expr=[column2@0 DESC NULLS LAST], preserve_partitioning=[true]
03)----AggregateExec: mode=FinalPartitioned, gby=[column2@0 as column2], aggr=[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]
04)------RepartitionExec: partitioning=Hash([column2@0], 4), input_partitions=1
05)--------AggregateExec: mode=Partial, gby=[column2@1 as column2], aggr=[sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1 + Int64(2))]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]

# Checks commutative forms of equivalent aggregate arguments are simplified consistently.
query II
SELECT SUM(1 + column1), SUM(column1 + 1) FROM sum_simplify_t;
----
7 7

query TT
EXPLAIN SELECT SUM(1 + column1), SUM(column1 + 1) FROM sum_simplify_t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS Int64(1) + sum_simplify_t.column1), sum(__common_expr_1 AS sum_simplify_t.column1 + Int64(1))]]
02)--Projection: Int64(1) + sum_simplify_t.column1 AS __common_expr_1
03)----TableScan: sum_simplify_t projection=[column1]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[sum(Int64(1) + sum_simplify_t.column1), sum(sum_simplify_t.column1 + Int64(1))]
02)--ProjectionExec: expr=[1 + column1@0 as __common_expr_1]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# Checks unsigned overflow edge case from PR discussion using transformed SUM arguments.
statement ok
CREATE TABLE IF NOT EXISTS tbl (val INTEGER UNSIGNED);

statement ok
INSERT INTO tbl VALUES (4294967295);

statement ok
INSERT INTO tbl VALUES (4294967295);

# Checks transformed SUM results for unsigned max values are preserved.
query TII
SELECT arrow_typeof(SUM(val + 1)), SUM(val + 1), SUM(val + 2) FROM tbl;
----
Int64 8589934592 8589934594

query TT
EXPLAIN SELECT arrow_typeof(SUM(val + 1)), SUM(val + 1), SUM(val + 2) FROM tbl;
----
logical_plan
01)Projection: arrow_typeof(sum(tbl.val + Int64(1))), sum(tbl.val + Int64(1)), sum(tbl.val + Int64(2))
02)--Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS tbl.val + Int64(1)), sum(__common_expr_1 AS tbl.val + Int64(2))]]
03)----Projection: CAST(tbl.val AS Int64) AS __common_expr_1
04)------TableScan: tbl projection=[val]
physical_plan
01)ProjectionExec: expr=[arrow_typeof(sum(tbl.val + Int64(1))@0) as arrow_typeof(sum(tbl.val + Int64(1))), sum(tbl.val + Int64(1))@0 as sum(tbl.val + Int64(1)), sum(tbl.val + Int64(2))@1 as sum(tbl.val + Int64(2))]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(tbl.val + Int64(1)), sum(tbl.val + Int64(2))]
03)----ProjectionExec: expr=[CAST(val@0 AS Int64) as __common_expr_1]
04)------DataSourceExec: partitions=1, partition_sizes=[2]

# Checks equivalent rewritten form (SUM + COUNT terms) matches transformed SUM semantics.
query RR
SELECT SUM(val) + 1 * COUNT(val), SUM(val) + 2 * COUNT(val) FROM tbl;
----
8589934592 8589934594

query TT
EXPLAIN SELECT SUM(val) + 1 * COUNT(val), SUM(val) + 2 * COUNT(val) FROM tbl;
----
logical_plan
01)Projection: __common_expr_1 + CAST(count(tbl.val) AS Decimal128(20, 0)) AS sum(tbl.val) + Int64(1) * count(tbl.val), __common_expr_1 AS sum(tbl.val) + CAST(Int64(2) * count(tbl.val) AS Decimal128(20, 0))
02)--Projection: CAST(sum(tbl.val) AS Decimal128(20, 0)) AS __common_expr_1, count(tbl.val)
03)----Aggregate: groupBy=[[]], aggr=[[sum(CAST(tbl.val AS UInt64)), count(tbl.val)]]
04)------TableScan: tbl projection=[val]
physical_plan
01)ProjectionExec: expr=[__common_expr_1@0 + CAST(count(tbl.val)@1 AS Decimal128(20, 0)) as sum(tbl.val) + Int64(1) * count(tbl.val), __common_expr_1@0 + CAST(2 * count(tbl.val)@1 AS Decimal128(20, 0)) as sum(tbl.val) + Int64(2) * count(tbl.val)]
02)--ProjectionExec: expr=[CAST(sum(tbl.val)@0 AS Decimal128(20, 0)) as __common_expr_1, count(tbl.val)@1 as count(tbl.val)]
03)----AggregateExec: mode=Single, gby=[], aggr=[sum(tbl.val), count(tbl.val)]
04)------DataSourceExec: partitions=1, partition_sizes=[2]

statement ok
DROP TABLE IF EXISTS tbl;

statement ok
DROP TABLE sum_simplify_t;