Skip to content

feat(python/sedonadb): add DataFrame.join#908

Open
jiayuasu wants to merge 1 commit into
apache:mainfrom
jiayuasu:feature/df-join
Open

feat(python/sedonadb): add DataFrame.join#908
jiayuasu wants to merge 1 commit into
apache:mainfrom
jiayuasu:feature/df-join

Conversation

@jiayuasu
Copy link
Copy Markdown
Member

@jiayuasu jiayuasu commented Jun 3, 2026

First of the join sub-PRs from #791. cross_join is a separate small follow-up.

API

df.join(other, on="k")                              # equi-join on common name
df.join(other, on=["k1", "k2"], how="left")         # multi-key
df.join(other, on="k", how="outer")
df.join(other, on=left.k == right.kr)               # arbitrary predicate
df.join(other, on=[left.a == right.a, left.b > right.b])
  • on accepts a column name (str), a list of column names, a boolean Expr, or a list of boolean Exprs combined with logical AND.
  • how: inner (default), left, right, outer, left_semi, left_anti, right_semi, right_anti. PySpark aliases also accepted: full → outer, semi → left_semi, anti → left_anti.

Output shape

  • Column names (on="k" / on=["k1", "k2"]): result has a single copy of each join key — matches pandas / Polars / PySpark, not DataFusion's keep-both-copies default.
  • Predicate Expr (on=left.k == right.k): both sides' columns are kept verbatim — same as PySpark's predicate join. Caller controls disambiguation via df.alias(...) when the same column name appears on both sides.

Worth flagging — the auto-dedup machinery (column-name path)

DataFusion's DataFrame join has two behaviors that diverge from user expectations:

  1. Rejects two unaliased inputs that share column names with "duplicate qualified field name". Both default to the ?table? qualifier; the merged schema has unresolvable collisions before the join even runs.
  2. Keeps both copies of the join key in the result when inputs are aliased. The SQL USING parser does the dedup at parse time; the DataFrame API doesn't.

For the column-name path, the Python wrapper:

  1. Aliases both sides internally with sentinel qualifiers (_sd_join_left_ / _sd_join_right_).
  2. Synthesizes aliased_left[k] == aliased_right[k] predicates and calls join_on.
  3. Projects with fully qualified column refs to dedupe the key columns and strip the sentinel qualifiers from the output.

The unified join-key column:

  • inner / left: sourced from the left side (always populated).
  • right: sourced from the right side (left may be NULL for unmatched rows).
  • outer: COALESCE(left.k, right.k) — picks the populated side for rows unmatched on either input.
  • left_semi / left_anti / right_semi / right_anti: only one side's columns appear; projection is straightforward.

For the predicate path, none of this dedup machinery runs — the Exprs flow straight through to DataFrame::join_on.

Implementation

File Change
python/sedonadb/src/dataframe.rs New InternalDataFrame::join_on(right, predicates, how). Thin wrapper over DataFusion's DataFrame::join_on. Maps the how string to JoinType; takes a Vec<PyExpr> of predicates.
python/sedonadb/python/sedonadb/dataframe.py New DataFrame.join(other, on, how). Routes the column-name path through the alias-and-project pipeline; passes Expr predicates straight through.

Test plan

20 tests in tests/expr/test_dataframe_join.py:

  • Positive (column-name path): single-key inner / left / right / outer; multi-key inner; left_semi; left_anti; right_semi; right_anti.
  • PySpark how aliases: semi / anti / full.
  • Positive (predicate path): single Expr equi-join; list of Exprs combined with AND; non-equi predicate (left.x < right.y) as a spatial-join shape analogue.
  • Lazy return: isinstance(out, DataFrame).
  • Errors: non-DataFrame other; bad on type; empty on list; mixed str / non-str (or str / Expr) in on list; invalid how; unknown column (caught pre-flight as KeyError).

All output assertions use exact pd.testing.assert_frame_equal after sorting.

Local: 20 unit + 24 doctests + 187 expr-dir tests + ruff format + ruff check + cargo fmt --check all clean.

Known limitations (for follow-up)

  • Non-key column-name collisions in the column-name path are not auto-suffixed (_x / _y like pandas). The duplicate names propagate and become ambiguous to reference. Deferred to a later PR per the design discussion.
  • Self-join requires the user to alias one side explicitly. The internal sentinel aliasing handles the unaliased common case but doesn't disambiguate a self-join.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a pandas/Polars/PySpark-style DataFrame.join() API to SedonaDB’s Python DataFrame layer, implementing common-key equi-joins with pandas-shaped output (single copy of join keys) by aliasing both inputs and projecting a de-duplicated schema after the DataFusion join.

Changes:

  • Added a Rust InternalDataFrame::join(...) wrapper that maps how strings to DataFusion JoinType and calls DataFrame::join.
  • Added Python DataFrame.join(other, on, how) that normalizes/validates inputs, performs internal aliasing, and projects to dedupe join keys (including COALESCE for outer joins).
  • Added a new Python test module covering core join behaviors and input validation.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
python/sedonadb/tests/expr/test_dataframe_join.py New test suite for DataFrame.join() behavior and validation.
python/sedonadb/src/dataframe.rs Rust-side join binding for Python InternalDataFrame.
python/sedonadb/python/sedonadb/dataframe.py Public DataFrame.join() implementation with alias-and-project dedup logic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread python/sedonadb/src/dataframe.rs Outdated
Comment on lines 27 to 31
use datafusion::logical_expr::SortExpr;
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::{Column, DataFusionError, ParamValues};
use datafusion_common::{Column, DataFusionError, JoinType, ParamValues};
use datafusion_execution::TaskContextProvider;
use datafusion_expr::{ExplainFormat, ExplainOption, Expr};
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved in ef61ea6JoinType is now imported from datafusion_expr to match the dominant convention in the workspace (8 sites already use that path).

# that side's key; COALESCE picks the populated one.
from sedonadb.expr.expression import ScalarUdf

coalesce_udf = ScalarUdf(self._ctx.scalar_udf("coalesce"))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._ctx in this DataFrame class is actually the internal context handle (_lib.InternalContext), not the user-facing SedonaContext Python class — and InternalContext exposes scalar_udf(name) (added in #885). The outer-join test test_join_outer covers this code path and passes. Added a clarifying comment at the call site in ef61ea6 since the naming is genuinely confusing here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what it's worth I updated this in #901 (it was confusing there, too)

Copy link
Copy Markdown
Member Author

@jiayuasu jiayuasu Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — self._ctx.funcs.coalesce(...) now in 608cc69, replacing the InternalContext.scalar_udf dance. Much cleaner.

Comment on lines +745 to +746
elif how in ("right_semi", "right_anti"):
projection = [_col(c, RIGHT_ALIAS)._impl for c in right_cols]
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_join_right_semi and test_join_right_anti in ef61ea6, parallel to the existing left-semi/anti cases.

Copy link
Copy Markdown
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

A few things worth probably worth tackling now. The expr variant should be straightforward/pyspark compatible out of the box:

t = sd.read_parquet(...)
t2 = sd.read_parquet(...)
t.join(t2, on=t.x == t2.x).select(t.a, t.b, t.x, t2.c)

self,
other: "DataFrame",
on: Union[str, List[str]],
how: str = "inner",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a typing hint here (Literal["inner", "left", "right", etc])? This gives a dropdown when typing this in a notebook.

Copy link
Copy Markdown
Member Author

@jiayuasu jiayuasu Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 608cc69Literal["inner", "left", "right", "outer", "full", "left_semi", "semi", "left_anti", "anti", "right_semi", "right_anti"] so the dropdown surfaces both canonical names and the PySpark aliases.

Comment on lines +756 to +757
if c in key_set and how == "right":
projection.append(_col(c, RIGHT_ALIAS)._impl)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a getter for getting the qualified column...you can just do right_aliased[c] (below can be simplified too).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to left_aliased[c] / right_aliased[c] everywhere in the alias-and-project pipeline. The synthesized equi-join predicates are now (left_aliased[k] == right_aliased[k])._impl too — much nicer to read.

)

if isinstance(on, str):
keys: List[str] = [on]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLMs love to add these type hints but I would love to avoid them (like LLM-generated comments, they can very easily become type hints that are lying)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. No internal type annotations on local vars in the new join() body. The public signature keeps its hints (on: Union[str, List[str], Expr, List[Expr]], how: Literal[...]) since those drive the IDE dropdown — happy to drop those too if you'd rather.

Comment thread python/sedonadb/src/dataframe.rs Outdated
Comment on lines +260 to +279
/// Equi-join two DataFrames on one or more common-named columns.
///
/// The Python wrapper handles all input normalization (single-column
/// `on=str` → one-element list; rejects empty input; rejects
/// unknown `how` strings). On the Rust side we just:
///
/// 1. Map the `how` string to a `JoinType` enum value. Unrecognized
/// strings come back from Python — we still match exhaustively
/// here so the binding fails loudly if the Python wrapper ever
/// drifts out of sync.
/// 2. Borrow the column-name vectors as `&[&str]` since DataFusion's
/// `DataFrame::join` takes that shape.
/// 3. Pass `filter=None`; an additional residual predicate is the
/// next sub-PR (expression-based join).
fn join(
&self,
right: &InternalDataFrame,
join_cols: Vec<String>,
how: &str,
) -> Result<InternalDataFrame, PySedonaError> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably worth only exposing the expression endpoint here to reduce the binding surface (you can easily generate the expression with [left_aliased[c] == right_aliased[c] for c in keys]).

Can you avoid any references to future PRs? These PRs stand nicely on their own.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 608cc69. The Rust side now exposes only InternalDataFrame::join_on(right, predicates: Vec<PyExpr>, how) — a thin wrapper over DataFusion's DataFrame::join_on. The Python wrapper synthesizes [left_aliased[k] == right_aliased[k] for k in keys] for the column-name path and passes Expr predicates through verbatim. Forward-PR references stripped from both the commit message and the code comments.

Comment on lines +76 to +80
# `outer` returns all rows from both sides; the merged key column has
# the union of values.
out = left.join(right, on="k", how="outer").sort("k").to_pandas()
assert sorted(out["k"].tolist()) == [1, 2, 3]
assert len(out) == 3
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this isn't testing the actual dataframe output?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — test_join_outer now asserts the full output via pd.testing.assert_frame_equal, including the COALESCE'd key column and the NULL-bearing value columns on both sides. While here I also added tests for the Expr predicate path (single, list, non-equi) and the PySpark how aliases.

Comment on lines +707 to +716
valid_how = {
"inner",
"left",
"right",
"outer",
"left_semi",
"left_anti",
"right_semi",
"right_anti",
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind whether we do this or not, but pyspark seems to allow "semi" and "anti" as aliases for "left_semi" and "left_anti", and "full" as an alias for "outer".

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added — full, semi, anti route to outer, left_semi, left_anti respectively. Mapping is a small dict at the top of join(); test_join_how_pyspark_aliases covers all three.

@jiayuasu jiayuasu requested a review from Copilot June 5, 2026 18:25
@jiayuasu jiayuasu changed the title feat(python/sedonadb): add DataFrame.join (common-key equi-join) feat(python/sedonadb): add DataFrame.join Jun 5, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment on lines +637 to +641
def join(
self,
other: "DataFrame",
on: Union[str, List[str], Expr, List[Expr]],
how: Literal[
Comment on lines +753 to +756
left_cols = self._impl.columns()
right_cols = other._impl.columns()
left_aliased = self.alias(LEFT_ALIAS)
right_aliased = other.alias(RIGHT_ALIAS)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 3e00905. Pre-flight check builds missing_left / missing_right from on_list and raises a single KeyError naming both sides explicitly, e.g.:

Join keys missing — left: [], right: ['k']. Left columns: ['k', 'v']; right columns: ['j', 'w']

Covered by test_join_key_missing_on_one_side_errors and an updated test_join_unknown_column_errors.

First of the join sub-PRs from apache#791. Cross join is a separate small
follow-up.

API:

    df.join(other, on="k")                              # equi-join on common name
    df.join(other, on=["k1", "k2"], how="left")         # multi-key
    df.join(other, on="k", how="outer")
    df.join(other, on=left.k == right.kr)               # predicate
    df.join(other, on=[left.a == right.a, left.b > right.b])

- `on` accepts a column name, a list of column names, a boolean
  `Expr`, or a list of boolean `Expr`s.
- `how`: `inner` (default), `left`, `right`, `outer`, `left_semi`,
  `left_anti`, `right_semi`, `right_anti`. PySpark aliases `full`,
  `semi`, `anti` are accepted.

Output shape:

- **Column names**: result has a single copy of each join key —
  matches pandas / Polars / PySpark, not DataFusion's keep-both-
  copies default.
- **Predicate `Expr`**: both sides' columns are kept verbatim — same
  as PySpark's predicate join. Caller controls disambiguation via
  `df.alias(...)` when the same column name appears on both sides.

Rust side: thin wrapper over DataFusion's `DataFrame::join_on`.
Maps the `how` string to `JoinType`; takes a `Vec<PyExpr>` of
predicates.

Python wrapper for the column-name path synthesizes equi-join
predicates from aliased qualified columns and projects the result
to dedupe the join keys:

  1. Alias both sides internally with sentinel qualifiers so the
     merged schema has no qualified-name collisions.
  2. Build `aliased_left[k] == aliased_right[k]` predicates and call
     `join_on`.
  3. Project with fully qualified refs: left's full column list plus
     right's non-key columns. The unified join key comes from the
     left for inner/left, the right for right joins, and via
     `COALESCE(left.k, right.k)` for outer joins.
  4. Semi/anti joins skip the projection logic — DataFusion already
     drops the right (or left) columns.

Tests: 20 covering single/multi-key inner, left, right, outer,
all four semi/anti variants, PySpark how aliases, Expr predicate
(single, list, non-equi), lazy return, and type / empty / mixed /
bad-how / unknown-column error paths.

Limitations to follow up later: non-key column-name collisions
between left and right are not auto-suffixed (`_x`/`_y` like
pandas); the duplicate names propagate and become ambiguous to
reference.
@jiayuasu jiayuasu marked this pull request as ready for review June 7, 2026 05:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants