Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 103 additions & 10 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use crate::utils::{
use arrow::datatypes::DataType;
use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err};
use datafusion_common::{
Column, DFSchema, DFSchemaRef, Result, get_target_functional_dependencies,
not_impl_err, plan_err,
};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::ExprSchemable;
use datafusion_expr::builder::get_struct_unnested_columns;
Expand All @@ -45,8 +48,8 @@ use datafusion_expr::utils::{
expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
LogicalPlanBuilderOptions, Partitioning, SortExpr,
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
SortExpr,
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -90,6 +93,89 @@ fn flatten_expr_groups(expr_groups: Vec<Vec<Expr>>) -> Vec<Expr> {
expr_groups.into_iter().flatten().collect()
}

fn referenced_columns_outside_aggregates(
expr: &Expr,
accum: &mut HashSet<Column>,
) -> Result<()> {
expr.apply(|nested_expr| match nested_expr {
Expr::Column(column) => {
accum.insert(column.clone());
Ok(TreeNodeRecursion::Continue)
}
Expr::AggregateFunction(_) => Ok(TreeNodeRecursion::Jump),
_ => Ok(TreeNodeRecursion::Continue),
})
.map(|_| ())
}

fn required_aggregate_output_columns(
schema: &DFSchemaRef,
select_exprs: &[Expr],
having_expr_opt: Option<&Expr>,
qualify_expr_opt: Option<&Expr>,
order_by_exprs: &[SortExpr],
on_exprs: &[Expr],
) -> Result<HashSet<String>> {
let mut columns = HashSet::new();

for expr in select_exprs {
referenced_columns_outside_aggregates(expr, &mut columns)?;
}
if let Some(expr) = having_expr_opt {
referenced_columns_outside_aggregates(expr, &mut columns)?;
}
if let Some(expr) = qualify_expr_opt {
referenced_columns_outside_aggregates(expr, &mut columns)?;
}
for sort_expr in order_by_exprs {
referenced_columns_outside_aggregates(&sort_expr.expr, &mut columns)?;
}
for expr in on_exprs {
referenced_columns_outside_aggregates(expr, &mut columns)?;
}

let mut required_columns = HashSet::new();
for column in &columns {
if let Ok((qualifier, field)) = schema.qualified_field_from_column(column) {
required_columns.insert(
Expr::Column(Column::from((qualifier, field)))
.schema_name()
.to_string(),
);
}
}

Ok(required_columns)
}

fn add_required_group_by_exprs_from_dependencies(
mut group_expr: Vec<Expr>,
schema: &DFSchemaRef,
required_output_columns: &HashSet<String>,
) -> Result<Vec<Expr>> {
let mut group_by_field_names = group_expr
.iter()
.map(|expr| expr.schema_name().to_string())
.collect::<Vec<_>>();

if let Some(target_indices) =
get_target_functional_dependencies(schema, &group_by_field_names)
{
for idx in target_indices {
let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
let expr_name = expr.schema_name().to_string();
if required_output_columns.contains(&expr_name)
&& !group_by_field_names.contains(&expr_name)
{
group_by_field_names.push(expr_name);
group_expr.push(expr);
}
}
}

Ok(group_expr)
}

impl<S: ContextProvider> SqlToRel<'_, S> {
/// Generate a logic plan from an SQL select
pub(super) fn select_to_plan(
Expand Down Expand Up @@ -733,10 +819,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let agg_expr = agg.aggr_expr.clone();
let (new_input, new_group_by_exprs) =
self.try_process_group_by_unnest(agg)?;
let options = LogicalPlanBuilderOptions::new()
.with_add_implicit_group_by_exprs(true);
LogicalPlanBuilder::from(new_input)
.with_options(options)
.aggregate(new_group_by_exprs, agg_expr)?
.build()
}
Expand Down Expand Up @@ -1179,11 +1262,21 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
aggr_exprs: &[Expr],
) -> Result<AggregatePlanResult> {
// create the aggregate plan
let options =
LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
let required_output_columns = required_aggregate_output_columns(
input.schema(),
select_exprs,
having_expr_opt,
qualify_expr_opt,
order_by_exprs,
on_exprs,
)?;
let group_by_exprs = add_required_group_by_exprs_from_dependencies(
group_by_exprs.to_vec(),
input.schema(),
&required_output_columns,
)?;
let plan = LogicalPlanBuilder::from(input.clone())
.with_options(options)
.aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
.aggregate(group_by_exprs, aggr_exprs.to_vec())?
.build()?;
let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan {
&agg.group_expr
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3602,7 +3602,7 @@ SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd', 'e


# primary key should be aware from which columns it is associated
statement error DataFusion error: Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "r\.sn" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "l\.sn, l\.zip_code, l\.country, l\.ts, l\.currency, l\.amount, sum\(l\.amount\)" appears in the SELECT clause satisfies this requirement
statement error DataFusion error: Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "r\.sn" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "l\.sn, sum\(l\.amount\)" appears in the SELECT clause satisfies this requirement
SELECT l.sn, r.sn, SUM(l.amount), r.amount
FROM sales_global_with_pk AS l
JOIN sales_global_with_pk AS r
Expand Down Expand Up @@ -3692,7 +3692,7 @@ ORDER BY r.sn
4 100 2022-01-03T10:00:00

# after join, new window expressions shouldn't be associated with primary keys
statement error DataFusion error: Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "rn1" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "r\.sn, r\.ts, r\.amount, sum\(r\.amount\)" appears in the SELECT clause satisfies this requirement
statement error DataFusion error: Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "rn1" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "r\.sn, sum\(r\.amount\)" appears in the SELECT clause satisfies this requirement
SELECT r.sn, SUM(r.amount), rn1
FROM
(SELECT r.ts, r.sn, r.amount,
Expand Down
121 changes: 121 additions & 0 deletions datafusion/sqllogictest/test_files/group_by_fd_prune.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Functional-dependency targets should only be added to aggregate
# GROUP BY outputs when the SQL query actually needs them after
# aggregation.
Comment on lines +18 to +20

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests pass without the code changes in this PR.


statement ok
CREATE TABLE fd_group_by_pk (
id INT,
name VARCHAR,
amount FLOAT,
PRIMARY KEY(id)
) AS VALUES
(1, 'a', 10.0),
(2, 'b', 20.0),
(3, 'c', 30.0)

statement ok
set datafusion.explain.logical_plan_only = true;

# Unreferenced columns determined by the primary key are not appended
# to the aggregate group keys.
query TT
EXPLAIN SELECT id, SUM(amount)
FROM fd_group_by_pk
GROUP BY id
ORDER BY id
----
logical_plan
01)Sort: fd_group_by_pk.id ASC NULLS LAST
02)--Aggregate: groupBy=[[fd_group_by_pk.id]], aggr=[[sum(CAST(fd_group_by_pk.amount AS Float64))]]
03)----TableScan: fd_group_by_pk projection=[id, amount]

# SELECT references to functionally-dependent columns still make
# those columns available after aggregation.
query TT
EXPLAIN SELECT id, name, SUM(amount)
FROM fd_group_by_pk
GROUP BY id
ORDER BY id
----
logical_plan
01)Sort: fd_group_by_pk.id ASC NULLS LAST
02)--Aggregate: groupBy=[[fd_group_by_pk.id, fd_group_by_pk.name]], aggr=[[sum(CAST(fd_group_by_pk.amount AS Float64))]]
03)----TableScan: fd_group_by_pk projection=[id, name, amount]

# HAVING references to functionally-dependent columns also keep them
# available for the post-aggregate filter.
query TT
EXPLAIN SELECT id, SUM(amount)
FROM fd_group_by_pk
GROUP BY id
HAVING name IS NOT NULL OR SUM(amount) > 0
ORDER BY id
----
logical_plan
01)Sort: fd_group_by_pk.id ASC NULLS LAST
02)--Projection: fd_group_by_pk.id, sum(fd_group_by_pk.amount)
03)----Filter: fd_group_by_pk.name IS NOT NULL OR sum(fd_group_by_pk.amount) > Float64(0)
04)------Aggregate: groupBy=[[fd_group_by_pk.id, fd_group_by_pk.name]], aggr=[[sum(CAST(fd_group_by_pk.amount AS Float64))]]
05)--------TableScan: fd_group_by_pk projection=[id, name, amount]

# ORDER BY references to functionally-dependent columns keep them
# available as hidden aggregate outputs.
query TT
EXPLAIN SELECT id, SUM(amount)
FROM fd_group_by_pk
GROUP BY id
ORDER BY name
----
logical_plan
01)Projection: fd_group_by_pk.id, sum(fd_group_by_pk.amount)
02)--Sort: fd_group_by_pk.name ASC NULLS LAST
03)----Projection: fd_group_by_pk.id, sum(fd_group_by_pk.amount), fd_group_by_pk.name
04)------Aggregate: groupBy=[[fd_group_by_pk.id, fd_group_by_pk.name]], aggr=[[sum(CAST(fd_group_by_pk.amount AS Float64))]]
05)--------TableScan: fd_group_by_pk projection=[id, name, amount]

# DISTINCT ON references to functionally-dependent columns also keep
# them available for the post-aggregate distinct grouping.
query TT
EXPLAIN SELECT DISTINCT ON (name) id, SUM(amount)
FROM fd_group_by_pk
GROUP BY id
ORDER BY name
----
logical_plan
01)Projection: first_value(fd_group_by_pk.id) ORDER BY [fd_group_by_pk.name ASC NULLS LAST] AS id, first_value(sum(fd_group_by_pk.amount)) ORDER BY [fd_group_by_pk.name ASC NULLS LAST] AS sum(fd_group_by_pk.amount)
02)--Sort: fd_group_by_pk.name ASC NULLS LAST
03)----Aggregate: groupBy=[[fd_group_by_pk.name]], aggr=[[first_value(fd_group_by_pk.id) ORDER BY [fd_group_by_pk.name ASC NULLS LAST], first_value(sum(fd_group_by_pk.amount)) ORDER BY [fd_group_by_pk.name ASC NULLS LAST]]]
04)------Aggregate: groupBy=[[fd_group_by_pk.id, fd_group_by_pk.name]], aggr=[[sum(CAST(fd_group_by_pk.amount AS Float64))]]
05)--------TableScan: fd_group_by_pk projection=[id, name, amount]

# Ordinal grouping still resolves before dependency pruning.
query TT
EXPLAIN SELECT id, SUM(amount)
FROM fd_group_by_pk
GROUP BY 1
ORDER BY id
----
logical_plan
01)Sort: fd_group_by_pk.id ASC NULLS LAST
02)--Aggregate: groupBy=[[fd_group_by_pk.id]], aggr=[[sum(CAST(fd_group_by_pk.amount AS Float64))]]
03)----TableScan: fd_group_by_pk projection=[id, amount]

statement ok
RESET datafusion.explain.logical_plan_only;
Loading