Skip to content

proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes#22011

Open
jayshrivastava wants to merge 3 commits intoapache:mainfrom
jayshrivastava:js/serialize-dynamic-filters-in-execution-plans-2
Open

proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes#22011
jayshrivastava wants to merge 3 commits intoapache:mainfrom
jayshrivastava:js/serialize-dynamic-filters-in-execution-plans-2

Conversation

@jayshrivastava
Copy link
Copy Markdown
Contributor

@jayshrivastava jayshrivastava commented May 4, 2026

Which issue does this PR close?

Rationale for this change

SortExec, AggregateExec, and HashJoinExec do not serialize their dynamic filters, so plans lose dynamic filtering when they are serialized and sent across network boundaries.

What changes are included in this PR?

This change adds with_dynamic_filter_expr() and dynamic_filter_expr() to SortExec, AggregateExec, and HashJoinExec.

pub fn with_dynamic_filter_expr(
        mut self,
        filter: Arc<DynamicFilterPhysicalExpr>,
) -> Result<Self> 

pub fn dynamic_filter_expr(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {

This are used as getters and setters for the proto crate to get and set dynamic filters.

Are these changes tested?

Yes. See datafusion/datafusion/proto/tests/cases/roundtrip_physical_plan.rs. There are also tests for the plan nodes in the physical-plan crate.

Are there any user-facing changes?

SortExec, AggregateExec, and HashJoinExec now roundtrip serialize dynamic filter expressions.

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates core Core DataFusion crate proto Related to proto crate physical-plan Changes to the physical-plan crate labels May 4, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 4, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion v53.1.0 (current)
       Built [  83.361s] (current)
     Parsing datafusion v53.1.0 (current)
      Parsed [   0.035s] (current)
    Building datafusion v53.1.0 (baseline)
       Built [  80.539s] (baseline)
     Parsing datafusion v53.1.0 (baseline)
      Parsed [   0.034s] (baseline)
    Checking datafusion v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.643s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [ 166.242s] datafusion
    Building datafusion-physical-expr v53.1.0 (current)
       Built [  24.242s] (current)
     Parsing datafusion-physical-expr v53.1.0 (current)
      Parsed [   0.041s] (current)
    Building datafusion-physical-expr v53.1.0 (baseline)
       Built [  24.156s] (baseline)
     Parsing datafusion-physical-expr v53.1.0 (baseline)
      Parsed [   0.044s] (baseline)
    Checking datafusion-physical-expr v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.296s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  49.908s] datafusion-physical-expr
    Building datafusion-physical-expr-common v53.1.0 (current)
       Built [  19.498s] (current)
     Parsing datafusion-physical-expr-common v53.1.0 (current)
      Parsed [   0.019s] (current)
    Building datafusion-physical-expr-common v53.1.0 (baseline)
       Built [  19.428s] (baseline)
     Parsing datafusion-physical-expr-common v53.1.0 (baseline)
      Parsed [   0.020s] (baseline)
    Checking datafusion-physical-expr-common v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.190s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  39.955s] datafusion-physical-expr-common
    Building datafusion-physical-plan v53.1.0 (current)
       Built [  31.836s] (current)
     Parsing datafusion-physical-plan v53.1.0 (current)
      Parsed [   0.121s] (current)
    Building datafusion-physical-plan v53.1.0 (baseline)
       Built [  31.409s] (baseline)
     Parsing datafusion-physical-plan v53.1.0 (baseline)
      Parsed [   0.122s] (baseline)
    Checking datafusion-physical-plan v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.593s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  65.279s] datafusion-physical-plan
    Building datafusion-proto v53.1.0 (current)
       Built [  52.505s] (current)
     Parsing datafusion-proto v53.1.0 (current)
      Parsed [   0.130s] (current)
    Building datafusion-proto v53.1.0 (baseline)
       Built [  52.182s] (baseline)
     Parsing datafusion-proto v53.1.0 (baseline)
      Parsed [   0.133s] (baseline)
    Checking datafusion-proto v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   1.709s] 222 checks: 221 pass, 1 fail, 0 warn, 30 skip

--- failure constructible_struct_adds_field: externally-constructible struct adds field ---

Description:
A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
        ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.47.0/src/lints/constructible_struct_adds_field.ron

Failed in:
  field AggregateExecNode.dynamic_filter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1959
  field AggregateExecNode.dynamic_filter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1959
  field HashJoinExecNode.dynamic_filter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1777
  field HashJoinExecNode.dynamic_filter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1777
  field SortExecNode.dynamic_filter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1992
  field SortExecNode.dynamic_filter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1992

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [ 108.491s] datafusion-proto

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 4, 2026
Builds on the prior `DynamicFilterPhysicalExpr` proto serialization +
dedupe work so plan-node references to a shared dynamic filter survive
roundtrip.

- Adds `dynamic_filter` to the proto messages for `SortExec`,
  `AggregateExec`, and `HashJoinExec` and wires it through
  to/from-proto.
- Exposes `dynamic_filter()` / `with_dynamic_filter()` on those plan
  nodes so the dedupe deserializer can reattach the shared
  `DynamicFilterPhysicalExpr` after roundtrip.
- Extracts `supported_accumulators_info()` on `AggregateExec` and uses
  it from `init_dynamic_filter` and `with_dynamic_filter`.
- Adds `test_hash_join_with_dynamic_filter_roundtrip`,
  `test_aggregate_with_dynamic_filter_roundtrip`, and
  `test_sort_topk_with_dynamic_filter_roundtrip` to verify that the
  plan node and the pushdown-target `ParquetSource` predicate end up
  pointing at the same `expression_id` after roundtrip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jayshrivastava
Copy link
Copy Markdown
Contributor Author

jayshrivastava commented May 4, 2026

We may alternatively use map_expressions (unmerged, see #20899) to set dynamic filters or apply_expressions (merged, see #20337) to get dynamic filters.

I'll leave it up to reviewers for thoughts on this. This is the apply_expressions implementation for AggregateExec.

fn apply_expressions(
        &self,
        f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
    ) -> Result<TreeNodeRecursion> {
        // Apply to group by expressions
        let mut tnr = TreeNodeRecursion::Continue;
        for expr in self.group_by.input_exprs() {
            tnr = tnr.visit_sibling(|| f(expr.as_ref()))?;
        }

        // Apply to aggregate expressions
        for aggr in self.aggr_expr.iter() {
            for expr in aggr.expressions() {
                tnr = tnr.visit_sibling(|| f(expr.as_ref()))?;
            }
        }

        // Apply to filter expressions (FILTER WHERE clauses)
        for filter in self.filter_expr.iter().flatten() {
            tnr = tnr.visit_sibling(|| f(filter.as_ref()))?;
        }

        // Apply to dynamic filter expression if present
        if let Some(dyn_filter) = &self.dynamic_filter {
            tnr = tnr.visit_sibling(|| f(dyn_filter.filter.as_ref()))?;
        }

        Ok(tnr)
    }

We currently expose all these expressions via public methods

  /// Grouping expressions
    pub fn group_expr(&self) -> &PhysicalGroupBy {
        &self.group_by
    }

    /// Grouping expressions as they occur in the output schema
    pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
        self.group_by.output_exprs()
    }

    /// Aggregate expressions
    pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
        &self.aggr_expr
    }

    /// FILTER (WHERE clause) expression for each aggregate expression
    pub fn filter_expr(&self) -> &[Option<Arc<dyn PhysicalExpr>>] {
        &self.filter_expr
    }

I continued this pattern by adding pub fn dynamic_filter_expr(). I didn't use apply_expressions in the proto crate for dynamic filters because

  1. We would have to traverse all other filters and downcast
  2. The proto crate does not use apply_expressions to get expressions for serialization
  3. All getters and setters will go away after Split proto serialization to encapsulate private state #21835

If folks would still prefer using apply_expressions and map_expressions before merging this PR, let me know!

@jayshrivastava jayshrivastava changed the title proto: serialize dynamic filters on Sort, Aggregate, HashJoin proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes May 4, 2026
@jayshrivastava jayshrivastava force-pushed the js/serialize-dynamic-filters-in-execution-plans-2 branch from 0f5ea55 to 334ca91 Compare May 4, 2026 20:28
@jayshrivastava jayshrivastava marked this pull request as ready for review May 4, 2026 20:33
@jayshrivastava
Copy link
Copy Markdown
Contributor Author

cc @adriangb @LiaCastaneda let me know what you think about this one! (Lia is on vacation this week but I tagged her for when she's back)

Copy link
Copy Markdown
Contributor

@stuhood stuhood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed everything but aggregates (don't know anything about how filters are used there), and this looks good to me. Thanks!

Comment on lines +1313 to +1315
if let Ok(df) = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
.downcast::<DynamicFilterPhysicalExpr>()
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this error if it deserializes something unexpected? Ditto elsewhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TYFR! Sorry I missed the @ btw. Will make sure to tag you next time 😄

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah let's error. I'll push an update

@adriangb
Copy link
Copy Markdown
Contributor

adriangb commented May 4, 2026

Argh I hate that we keep having to leak more and more internal state to the public API just for serialization. Maybe we should just plow on as is but I think we as a community seriously have to consider #21949. We should discuss next sync.

@jayshrivastava
Copy link
Copy Markdown
Contributor Author

@adriangb Do you feel strongly about this PR specifically? I don't think there's any work remaining for dynamic filter round-tripping.

I'm on-board with #21949 and can find time to review it this week. I don't think we have a sync planned, so I can post in #21207 (comment) to schedule one if you would prefer meeting about it first!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change core Core DataFusion crate physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Serialize dynamic filters across network boundaries

3 participants