feat(python/sedonadb): add DataFrame.join#908
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a pandas/Polars/PySpark-style DataFrame.join() API to SedonaDB’s Python DataFrame layer, implementing common-key equi-joins with pandas-shaped output (single copy of join keys) by aliasing both inputs and projecting a de-duplicated schema after the DataFusion join.
Changes:
- Added a Rust
InternalDataFrame::join(...)wrapper that mapshowstrings to DataFusionJoinTypeand callsDataFrame::join. - Added Python
DataFrame.join(other, on, how)that normalizes/validates inputs, performs internal aliasing, and projects to dedupe join keys (includingCOALESCEfor outer joins). - Added a new Python test module covering core join behaviors and input validation.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| python/sedonadb/tests/expr/test_dataframe_join.py | New test suite for DataFrame.join() behavior and validation. |
| python/sedonadb/src/dataframe.rs | Rust-side join binding for Python InternalDataFrame. |
| python/sedonadb/python/sedonadb/dataframe.py | Public DataFrame.join() implementation with alias-and-project dedup logic. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| use datafusion::logical_expr::SortExpr; | ||
| use datafusion::prelude::{DataFrame, SessionContext}; | ||
| use datafusion_common::{Column, DataFusionError, ParamValues}; | ||
| use datafusion_common::{Column, DataFusionError, JoinType, ParamValues}; | ||
| use datafusion_execution::TaskContextProvider; | ||
| use datafusion_expr::{ExplainFormat, ExplainOption, Expr}; |
There was a problem hiding this comment.
Moved in ef61ea6 — JoinType is now imported from datafusion_expr to match the dominant convention in the workspace (8 sites already use that path).
| # that side's key; COALESCE picks the populated one. | ||
| from sedonadb.expr.expression import ScalarUdf | ||
|
|
||
| coalesce_udf = ScalarUdf(self._ctx.scalar_udf("coalesce")) |
There was a problem hiding this comment.
self._ctx in this DataFrame class is actually the internal context handle (_lib.InternalContext), not the user-facing SedonaContext Python class — and InternalContext exposes scalar_udf(name) (added in #885). The outer-join test test_join_outer covers this code path and passes. Added a clarifying comment at the call site in ef61ea6 since the naming is genuinely confusing here.
There was a problem hiding this comment.
For what it's worth I updated this in #901 (it was confusing there, too)
There was a problem hiding this comment.
Thanks — self._ctx.funcs.coalesce(...) now in 608cc69, replacing the InternalContext.scalar_udf dance. Much cleaner.
| elif how in ("right_semi", "right_anti"): | ||
| projection = [_col(c, RIGHT_ALIAS)._impl for c in right_cols] |
There was a problem hiding this comment.
Added test_join_right_semi and test_join_right_anti in ef61ea6, parallel to the existing left-semi/anti cases.
paleolimbot
left a comment
There was a problem hiding this comment.
Thank you!
A few things worth probably worth tackling now. The expr variant should be straightforward/pyspark compatible out of the box:
t = sd.read_parquet(...)
t2 = sd.read_parquet(...)
t.join(t2, on=t.x == t2.x).select(t.a, t.b, t.x, t2.c)| self, | ||
| other: "DataFrame", | ||
| on: Union[str, List[str]], | ||
| how: str = "inner", |
There was a problem hiding this comment.
Can you add a typing hint here (Literal["inner", "left", "right", etc])? This gives a dropdown when typing this in a notebook.
There was a problem hiding this comment.
Added in 608cc69 — Literal["inner", "left", "right", "outer", "full", "left_semi", "semi", "left_anti", "anti", "right_semi", "right_anti"] so the dropdown surfaces both canonical names and the PySpark aliases.
| if c in key_set and how == "right": | ||
| projection.append(_col(c, RIGHT_ALIAS)._impl) |
There was a problem hiding this comment.
I added a getter for getting the qualified column...you can just do right_aliased[c] (below can be simplified too).
There was a problem hiding this comment.
Switched to left_aliased[c] / right_aliased[c] everywhere in the alias-and-project pipeline. The synthesized equi-join predicates are now (left_aliased[k] == right_aliased[k])._impl too — much nicer to read.
| ) | ||
|
|
||
| if isinstance(on, str): | ||
| keys: List[str] = [on] |
There was a problem hiding this comment.
LLMs love to add these type hints but I would love to avoid them (like LLM-generated comments, they can very easily become type hints that are lying)
There was a problem hiding this comment.
Agreed. No internal type annotations on local vars in the new join() body. The public signature keeps its hints (on: Union[str, List[str], Expr, List[Expr]], how: Literal[...]) since those drive the IDE dropdown — happy to drop those too if you'd rather.
| /// Equi-join two DataFrames on one or more common-named columns. | ||
| /// | ||
| /// The Python wrapper handles all input normalization (single-column | ||
| /// `on=str` → one-element list; rejects empty input; rejects | ||
| /// unknown `how` strings). On the Rust side we just: | ||
| /// | ||
| /// 1. Map the `how` string to a `JoinType` enum value. Unrecognized | ||
| /// strings come back from Python — we still match exhaustively | ||
| /// here so the binding fails loudly if the Python wrapper ever | ||
| /// drifts out of sync. | ||
| /// 2. Borrow the column-name vectors as `&[&str]` since DataFusion's | ||
| /// `DataFrame::join` takes that shape. | ||
| /// 3. Pass `filter=None`; an additional residual predicate is the | ||
| /// next sub-PR (expression-based join). | ||
| fn join( | ||
| &self, | ||
| right: &InternalDataFrame, | ||
| join_cols: Vec<String>, | ||
| how: &str, | ||
| ) -> Result<InternalDataFrame, PySedonaError> { |
There was a problem hiding this comment.
It is probably worth only exposing the expression endpoint here to reduce the binding surface (you can easily generate the expression with [left_aliased[c] == right_aliased[c] for c in keys]).
Can you avoid any references to future PRs? These PRs stand nicely on their own.
There was a problem hiding this comment.
Done in 608cc69. The Rust side now exposes only InternalDataFrame::join_on(right, predicates: Vec<PyExpr>, how) — a thin wrapper over DataFusion's DataFrame::join_on. The Python wrapper synthesizes [left_aliased[k] == right_aliased[k] for k in keys] for the column-name path and passes Expr predicates through verbatim. Forward-PR references stripped from both the commit message and the code comments.
| # `outer` returns all rows from both sides; the merged key column has | ||
| # the union of values. | ||
| out = left.join(right, on="k", how="outer").sort("k").to_pandas() | ||
| assert sorted(out["k"].tolist()) == [1, 2, 3] | ||
| assert len(out) == 3 |
There was a problem hiding this comment.
Is there a reason this isn't testing the actual dataframe output?
There was a problem hiding this comment.
Fixed — test_join_outer now asserts the full output via pd.testing.assert_frame_equal, including the COALESCE'd key column and the NULL-bearing value columns on both sides. While here I also added tests for the Expr predicate path (single, list, non-equi) and the PySpark how aliases.
| valid_how = { | ||
| "inner", | ||
| "left", | ||
| "right", | ||
| "outer", | ||
| "left_semi", | ||
| "left_anti", | ||
| "right_semi", | ||
| "right_anti", | ||
| } |
There was a problem hiding this comment.
I don't mind whether we do this or not, but pyspark seems to allow "semi" and "anti" as aliases for "left_semi" and "left_anti", and "full" as an alias for "outer".
There was a problem hiding this comment.
Added — full, semi, anti route to outer, left_semi, left_anti respectively. Mapping is a small dict at the top of join(); test_join_how_pyspark_aliases covers all three.
| def join( | ||
| self, | ||
| other: "DataFrame", | ||
| on: Union[str, List[str], Expr, List[Expr]], | ||
| how: Literal[ |
| left_cols = self._impl.columns() | ||
| right_cols = other._impl.columns() | ||
| left_aliased = self.alias(LEFT_ALIAS) | ||
| right_aliased = other.alias(RIGHT_ALIAS) |
There was a problem hiding this comment.
Added in 3e00905. Pre-flight check builds missing_left / missing_right from on_list and raises a single KeyError naming both sides explicitly, e.g.:
Join keys missing — left: [], right: ['k']. Left columns: ['k', 'v']; right columns: ['j', 'w']
Covered by test_join_key_missing_on_one_side_errors and an updated test_join_unknown_column_errors.
First of the join sub-PRs from apache#791. Cross join is a separate small follow-up. API: df.join(other, on="k") # equi-join on common name df.join(other, on=["k1", "k2"], how="left") # multi-key df.join(other, on="k", how="outer") df.join(other, on=left.k == right.kr) # predicate df.join(other, on=[left.a == right.a, left.b > right.b]) - `on` accepts a column name, a list of column names, a boolean `Expr`, or a list of boolean `Expr`s. - `how`: `inner` (default), `left`, `right`, `outer`, `left_semi`, `left_anti`, `right_semi`, `right_anti`. PySpark aliases `full`, `semi`, `anti` are accepted. Output shape: - **Column names**: result has a single copy of each join key — matches pandas / Polars / PySpark, not DataFusion's keep-both- copies default. - **Predicate `Expr`**: both sides' columns are kept verbatim — same as PySpark's predicate join. Caller controls disambiguation via `df.alias(...)` when the same column name appears on both sides. Rust side: thin wrapper over DataFusion's `DataFrame::join_on`. Maps the `how` string to `JoinType`; takes a `Vec<PyExpr>` of predicates. Python wrapper for the column-name path synthesizes equi-join predicates from aliased qualified columns and projects the result to dedupe the join keys: 1. Alias both sides internally with sentinel qualifiers so the merged schema has no qualified-name collisions. 2. Build `aliased_left[k] == aliased_right[k]` predicates and call `join_on`. 3. Project with fully qualified refs: left's full column list plus right's non-key columns. The unified join key comes from the left for inner/left, the right for right joins, and via `COALESCE(left.k, right.k)` for outer joins. 4. Semi/anti joins skip the projection logic — DataFusion already drops the right (or left) columns. Tests: 20 covering single/multi-key inner, left, right, outer, all four semi/anti variants, PySpark how aliases, Expr predicate (single, list, non-equi), lazy return, and type / empty / mixed / bad-how / unknown-column error paths. Limitations to follow up later: non-key column-name collisions between left and right are not auto-suffixed (`_x`/`_y` like pandas); the duplicate names propagate and become ambiguous to reference.
First of the join sub-PRs from #791.
cross_joinis a separate small follow-up.API
onaccepts a column name (str), a list of column names, a booleanExpr, or a list of booleanExprs combined with logical AND.how:inner(default),left,right,outer,left_semi,left_anti,right_semi,right_anti. PySpark aliases also accepted:full→ outer,semi→ left_semi,anti→ left_anti.Output shape
on="k"/on=["k1", "k2"]): result has a single copy of each join key — matches pandas / Polars / PySpark, not DataFusion's keep-both-copies default.Expr(on=left.k == right.k): both sides' columns are kept verbatim — same as PySpark's predicate join. Caller controls disambiguation viadf.alias(...)when the same column name appears on both sides.Worth flagging — the auto-dedup machinery (column-name path)
DataFusion's DataFrame join has two behaviors that diverge from user expectations:
?table?qualifier; the merged schema has unresolvable collisions before the join even runs.USINGparser does the dedup at parse time; the DataFrame API doesn't.For the column-name path, the Python wrapper:
_sd_join_left_/_sd_join_right_).aliased_left[k] == aliased_right[k]predicates and callsjoin_on.The unified join-key column:
COALESCE(left.k, right.k)— picks the populated side for rows unmatched on either input.For the predicate path, none of this dedup machinery runs — the Exprs flow straight through to
DataFrame::join_on.Implementation
python/sedonadb/src/dataframe.rsInternalDataFrame::join_on(right, predicates, how). Thin wrapper over DataFusion'sDataFrame::join_on. Maps thehowstring toJoinType; takes aVec<PyExpr>of predicates.python/sedonadb/python/sedonadb/dataframe.pyDataFrame.join(other, on, how). Routes the column-name path through the alias-and-project pipeline; passes Expr predicates straight through.Test plan
20 tests in
tests/expr/test_dataframe_join.py:howaliases:semi/anti/full.left.x < right.y) as a spatial-join shape analogue.isinstance(out, DataFrame).other; badontype; emptyonlist; mixed str / non-str (or str / Expr) inonlist; invalidhow; unknown column (caught pre-flight asKeyError).All output assertions use exact
pd.testing.assert_frame_equalafter sorting.Local: 20 unit + 24 doctests + 187 expr-dir tests +
ruff format+ruff check+cargo fmt --checkall clean.Known limitations (for follow-up)
_x/_ylike pandas). The duplicate names propagate and become ambiguous to reference. Deferred to a later PR per the design discussion.