Skip to content

feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource#22000

Open
adriangb wants to merge 6 commits intoapache:mainfrom
pydantic:parquet-sampling
Open

feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource#22000
adriangb wants to merge 6 commits intoapache:mainfrom
pydantic:parquet-sampling

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

@adriangb adriangb commented May 3, 2026

Which issue does this PR close?

Rationale for this change

DataFusion has the machinery for fine-grained parquet sampling (ParquetAccessPlan with Skip / Scan / Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it into PartitionedFile.extensions, and no SQL surface at all. That works for one-off code but is awkward for:

  • TABLESAMPLE SQL — most users want a sample by writing SELECT … FROM t TABLESAMPLE SYSTEM (10), not by reaching into ParquetSource builders.
  • Ad-hoc data exploration — "give me 1% of this dataset" via the CLI / DataFrame API.
  • Layered helpers that want to compute approximate stats over a bounded slice of data without scanning everything (e.g. an optimizer-fed sampled-stats helper — see the linked POC).
  • EXPLAIN ANALYZE-driven debug runs against a representative slice instead of the full table.

Two prior in-core attempts at TABLESAMPLE were closed without merging on semantics-fragmentation grounds: #16505 (Spark-style row-level + Poisson) and #16325 (random()-rewrite, which also hit a correctness bug from random() getting pushed into the parquet executor — fixed separately in #16545). The discussion in #13563 settled on "use the RelationPlanner extension API" as the way forward, and #17843 (merged Dec 9 2025) shipped that API specifically with TABLESAMPLE as the canonical motivating example. The blog post Extending SQL in DataFusion: from ->> to TABLESAMPLE documents that pattern.

This PR adds two layers in one go:

  1. Low-level builders on ParquetSource — opt-in, additive, no behavior change for existing scans.
  2. End-to-end SQL — a built-in RelationPlanner (auto-registered) that lifts TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)] to a Sample extension node; a SamplePushdown optimizer rule pushes it into ParquetSource via the cube-root hybrid across files / row groups / rows. Both SessionStateBuilder::with_default_features() and DefaultPhysicalPlanner::default() register the necessary glue, so datafusion-cli and any default SessionContext get TABLESAMPLE SYSTEM out of the box. Downstream consumers can register a RelationPlanner ahead of the built-in to add other flavors (BERNOULLI, ROW, BUCKET); the existing relation_planner/table_sample.rs example shows that pattern.

What changes are included in this PR?

Layer 1 — ParquetSource sampling builders

ParquetSource::new(schema)
    .with_row_group_sampling(0.1)   // keep ~10% of row groups per file
    .with_row_fraction(0.05)        // within each kept row group, keep ~5% of rows
    .with_row_cluster_size(8192);   // controls window granularity (default 32 768)

with_row_group_sampling(fraction):

  • Selection is deferred until the opener has loaded the parquet footer, so we sample by real row-group index.
  • Deterministic per (file_name, row_group_count, fraction) — re-runs match.
  • Always keeps at least one row group (target = max(1, ceil(N * fraction))).
  • No-op when fraction >= 1.0.

with_row_fraction(fraction):

  • Translates the per-row-group target into K contiguous windows spread evenly across the row group, each placed at a random offset within its stride. Window count = ceil(target / cluster_size).
  • Materializes a RowSelection per kept row group; the parquet reader uses the page index to read only the data pages covering the selected rows. This gives "page-level" IO savings without requiring per-column page alignment (which doesn't exist in parquet).
  • Falls back gracefully when the page index is missing — the reader still returns the right rows, the IO win just disappears.
  • Deterministic per (file_name, row_group_index, fraction, cluster_size).

The two layers compose: row_group_fraction = 0.1 × row_fraction = 0.1 reads ~1% of the rows from ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group.

Layer 2 — TABLESAMPLE SYSTEM SQL → cube-root pushdown

  • Logical: Sample UserDefinedLogicalNodeCore extension node in datafusion-expr (logical_plan/sample.rs). Schema-preserving; validates fraction ∈ (0, 1]. Encodes SampleMethod::System only — by design, since BERNOULLI / BUCKET / row-count semantics differ across DBs.
  • SQL surface: new datafusion_sql::sample::TableSampleSystemPlanner RelationPlanner, public, auto-registered via SessionStateDefaults::default_relation_planners() from SessionStateBuilder::with_default_features(). Handles TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]; rejects BERNOULLI, ROW count, BUCKET ... OUT OF ..., OFFSET with errors that explicitly point users at registering a custom RelationPlanner first. register_relation_planner prepends, so a user-supplied planner that returns Planned for the same syntax wins; returning Original falls through to the built-in.
  • Physical placeholder: SampleExec in datafusion-physical-plan. Errors at execute (it is a marker node, not an executor — the SamplePushdown rule is expected to remove it). Implements filter/sort pushdown passthrough so unrelated optimizer rules see straight through it.
  • Built-in ExtensionPlanner: new public SamplePhysicalPlanner in datafusion::physical_planner, pre-registered in DefaultPhysicalPlanner::default(). Lowers SampleSampleExec. Callers using with_extension_planners(...) replace the defaults and need to re-add SamplePhysicalPlanner if they want sampling.
  • Pushdown contract: new try_push_sample method on ExecutionPlan and FileSource, returning Absorbed { inner } / Passthrough / Unsupported { reason }. Passthrough overrides on filter, projection, coalesce, repartition, and non-fetch sort.
  • ParquetSource::try_push_sample runs the cube-root hybrid: q = p^(1/3) applied at all three levels so the IO win at small fractions doesn't concentrate at one granularity. Returns keep_files: Option<Vec<usize>> that the rule uses to rebuild FileScanConfig.file_groups.
  • SamplePushdown optimizer rule (between PushdownSort and EnsureCooperative) walks top-down. On Absorbed it replaces SampleExec with the rebuilt source; on Passthrough it pushes through the single-child node and recurses; on Unsupported it errors at planning time. There is intentionally no generic post-scan SampleExec yet — that's a follow-up.
  • EXPLAIN visibility: ParquetSource::fmt_extra surfaces sample_row_group_fraction and sample_row_fraction when set, so EXPLAIN reflects the pushdown.

How the override path works

SessionStateBuilder::register_relation_planner inserts at the front of the planner chain. The first planner to return Planned wins; returning Original falls through. So a downstream user who wants BERNOULLI (or anything else) can:

let mut state = SessionStateBuilder::new_with_default_features().build();
ctx.register_relation_planner(Arc::new(MyTableSamplePlanner));

MyTableSamplePlanner runs first; if it doesn't handle the input it returns Original and the built-in TableSampleSystemPlanner gets the next look. This is the same composition the existing relation_planner/table_sample.rs example demonstrates.

Internals

  • New ParquetSampling struct (re-exported at the crate root).
  • Plumbed through ParquetMorselizerPreparedParquetOpen.
  • Two free functions in opener.rsapply_row_group_sampling and apply_row_fraction_sampling — invoked from prune_row_groups right after create_initial_plan.
  • New dep: rand with the small_rng feature (already in workspace Cargo.toml).

Are these changes tested?

Layer 1 — ParquetSource sampling builders. 7 tests in datafusion-datasource-parquet::opener::test:

  • row_group_sampling_keeps_target_countceil(N * fraction) math.
  • row_group_sampling_is_deterministic — same inputs, same selection.
  • row_group_sampling_differs_per_file — different file_name, different sample.
  • row_group_sampling_no_op_when_fraction_is_one — fraction >= 1.0 keeps everything.
  • row_group_sampling_target_at_least_onefraction = 0.001 over 100 row groups still keeps 1.
  • row_group_sampling_end_to_end — writes a 4-row-group parquet to InMemory, scans with fraction = 0.5, asserts exactly 6 rows out (2 row groups × 3 rows).
  • row_fraction_end_to_end — writes a 100-row single-row-group parquet, scans with row_fraction = 0.1 and cluster_size = 4, asserts the result is in the (1, 16] range.

Layer 2 — pushdown contract. 4 unit tests on the Sample logical node + 7 unit tests on ParquetSource::try_push_sample directly:

  • try_push_sample_system_full_is_noopfraction >= 1.0 doesn't configure any sampling.
  • try_push_sample_system_configures_cube_root_on_source — both row_group_fraction and row_fraction are set to cbrt(p).
  • try_push_sample_system_drops_files_for_multi_file_scan — file-level keep_files is the right size and sorted/unique.
  • try_push_sample_system_keeps_at_least_one_file — never drops to zero.
  • try_push_sample_system_skips_file_drop_for_single_file — single-file scans return keep_files = None.
  • try_push_sample_system_repeatable_seed_is_deterministic — same seed → same selection; different seeds → different selection.
  • try_push_sample_system_target_clamped_to_num_files — when target == num_files, returns None to skip the rebuild.

Layer 2 — end-to-end SQL. New datafusion/sqllogictest/test_files/tablesample.slt runs against a default SessionContext, exercising the same path datafusion-cli users get:

  • SYSTEM(100) returns every row.
  • SYSTEM(50) REPEATABLE(42) is deterministic — re-runs match. Asserts the cube-root math gives 813 rows for a single-file / single-row-group parquet.
  • EXPLAIN confirms SampleExec is gone and ParquetSource advertises its sampling config (sample_row_group_fraction=0.7937, sample_row_fraction=0.7937).
  • Each rejection path (BERNOULLI, ROW count, fraction out of range) produces the right error.

cargo build --workspace, cargo fmt --all, and cargo clippy --workspace --exclude datafusion-benchmarks --all-targets -- -D warnings are clean.

Are there any user-facing changes?

  • New SQL surface (out of the box): SELECT … FROM t TABLESAMPLE SYSTEM (p) [REPEATABLE (seed)] works against any plan that bottoms out at a ParquetSource on a default SessionContext. datafusion-cli picks this up with no flag changes. Other forms error at planning time with messages that point at registering a custom RelationPlanner first.
  • New public Rust API on ParquetSource: with_row_group_sampling, with_row_fraction, with_row_cluster_size, sampling(), plus the ParquetSampling struct.
  • New public Rust API on datafusion-expr: Sample extension node + SampleMethod::System + sample_plan helper.
  • New public Rust API on datafusion-sql: sample::TableSampleSystemPlanner (the built-in RelationPlanner).
  • New trait methods on ExecutionPlan and FileSource: try_push_sample (default Unsupported), and the SamplePushdownResult / SampleSpec / SampleMethod / FileSourceSampleResult types in datafusion-physical-plan and datafusion-datasource.
  • New public SamplePhysicalPlanner in datafusion::physical_planner. Pre-registered in DefaultPhysicalPlanner::default(). Callers using with_extension_planners(...) replace the defaults and need to re-add it.
  • New SamplePushdown optimizer rule in the default physical pipeline. No-op when no SampleExec is in the plan; errors at planning time if a SampleExec exists and can't be pushed (no generic post-scan executor yet).
  • New default RelationPlanner registered by SessionStateBuilder::with_default_features(). A user-supplied RelationPlanner registered via register_relation_planner runs first and can override or supplement built-in semantics.
  • Existing scans / queries that don't use TABLESAMPLE are unaffected.

@github-actions github-actions Bot added the datasource Changes to the datasource crate label May 3, 2026
@github-actions github-actions Bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels May 3, 2026
@adriangb adriangb changed the title feat(parquet): row-group and row-range sampling on ParquetSource feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource May 3, 2026
@adriangb adriangb marked this pull request as ready for review May 3, 2026 16:16
@github-actions github-actions Bot removed the sql SQL Planner label May 3, 2026
@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

I've looked into the history of #16533 a bit.

In particular the decision in #16505 to add hooks (#17843) instead of integrating into core directly. The example in #17843 does post-scan sampling, once the IO cost has been paid. This PR attempts to do much more efficient IO level sampling (files, row groups, pages). In order for this to work we need to add APIs in ParquetSource, ExecutionPlan, etc. At that point I'm not sure an "external extension" approach is worth it. It seems that it's simpler to just bake the feature into DataFusion. The implementation in this PR splits the difference: it uses the RelationPlanner and extension logical plan nodes, but bundles it with SessionStateBuilder::with_default_features(). Personally I would like to at least add Sample to enum LogicalPlan, I don't find the WASM argument very strong but maybe that's just me.

@geoffreyclaude @alamb wdyt?

@github-actions github-actions Bot added sql SQL Planner documentation Improvements or additions to documentation labels May 4, 2026
@geoffreyclaude
Copy link
Copy Markdown
Contributor

@geoffreyclaude @alamb wdyt?

Thanks for the ping @adriangb!

My take is that the 'RelationPlanner' direction still makes sense here. The point was not that TABLESAMPLE should never be shipped by DataFusion, but that the SQL semantics should remain overridable.

So I think your split makes a lot of sense: put the parquet and physical pushdown in core, but leave the SQL semantics to RelationPlanner though the TableSampleSystemPlanner you introduced. Since custom planners are inserted first (by design), downstream users can still override the built-in SYSTEM semantics if they need to.

On adding Sample to enum LogicalPlan, I'm not fundamentally against it, but it should probably be done in a dedicated follow-up PR. Maybe that follow-up PR should also wire it up more completely with a DataFrame API, generic SampleExec for when pushdown isn't supported, sampled-stats as discussed in #21624... On its own, I don't see the advantage of Sample over UserDefinedLogicalNodeCore, especially given the amount of new boilerplate code it will introduce.

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

Okay makes sense then, we're on the same page 😄. Since you've worked on this before would you like to review this PR?

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

I'm also happy to split it up:

  • PR 1: add methods to ParquetSource, etc.
  • PR 2: add ExecutionPlan trait methods, SampleExec physical plan and pushdown optimizer rule
  • PR 3: add TableSampleSystemPlanner and wire it all up

Up to you...

@adriangb adriangb force-pushed the parquet-sampling branch 2 times, most recently from 2cc8d76 to f3270f1 Compare May 4, 2026 14:52
@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

I've cleaned up the history into stacked commits in case that's helpful.

@geoffreyclaude
Copy link
Copy Markdown
Contributor

I've cleaned up the history into stacked commits in case that's helpful.

That's super useful, thanks! I'll give it a look tomorrow.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 4, 2026

DataFusion has the machinery for fine-grained parquet sampling (ParquetAccessPlan with Skip / Scan / Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it into PartitionedFile.extensions, and no SQL surface at all. That works for one-off code but is awkward for:

My personal rationale was that all the different SQL systems did sampling differently -- so any particular choice for sampling is probably fine but I wasn't at all sure there would be enough commonality across implementations to put it into DataFusion core

ALso, what is the problem with constructing the access plan by hand? For this type of low level access pattern (particular sampling methods etc) it seems like low level construction is just the escape valve that is needed (super fine grained control)

I am very wary of complicating the built in Parquet reader any more -- it is already very complicated with lots of behaviros (and new ones getting added all ghe time, for example the sortedness ones from @zhuqi-lucas and @xudong963 )

So adding APIs to make it easier to extend / modify plans makes sense to me, but hard coding more sampling into the core is much less clear to me

@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 4, 2026

Did you try and implement whatever sampling you need with just the existing APIs? Aka no change to the core? If that wasn't possible, what was missing in the API?

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

Thanks for reviewing @alamb!

Did you try and implement whatever sampling you need with just the existing APIs? Aka no change to the core? If that wasn't possible, what was missing in the API?

Yes, it won't work sadly. With current APIs you could maybe sample at the file level, but not at the row group or page level (which requires reading parquet footers for all files, parsing them, etc). The way I implemented things here that is lazily deferred to file opening so there is no wasted work.

Also requiring doing rust bits to make it work precludes this working with e.g. datafusion-cli. I think it is a useful primitive to have in datafusion-cli (e.g. for data exploration) and can be a strong building block for work like #21624

On the ExecutionPlan level unless we add some APIs there would be a lot of tight coupling (mainly via downcast matching on both ends) between the custom optimizer rule, the custom physical plan and the data source. I.e. the optimizer rule needs to downcast match every plan in the path between the SampleExec and ParquetSource and understand them. It is incompatible with any other custom plans, custom data sources, etc (i.e. composition is broken). The way I've set it up in this PR if e.g. Vortex wanted to implement table sampling they could do that in a self contained way inside of their file source implementation quite easily.

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

The actual changes to opener.rs are quite small, just 2 blocks of (IMO) easily understandable code to update the access plan: 3d0dc4a.

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

I am very wary of complicating the built in Parquet reader any more -- it is already very complicated with lots of behaviros (and new ones getting added all ghe time, for example the sortedness ones from @zhuqi-lucas and @xudong963 )

I agree it is a complex piece of software but I think we can continue to add the right abstractions and simplifications (like you recently did with the moralization work 😄 ). Ultimately the file reader is going to be a key piece of a data toolkit like DataFusion so it's unsurprising (to me) that it holds a lot of the complexity.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 4, 2026

I am very wary of complicating the built in Parquet reader any more -- it is already very complicated with lots of behaviros (and new ones getting added all ghe time, for example the sortedness ones from @zhuqi-lucas and @xudong963 )

I agree it is a complex piece of software but I think we can continue to add the right abstractions and simplifications (like you recently did with the moralization work 😄 ). Ultimately the file reader is going to be a key piece of a data toolkit like DataFusion so it's unsurprising (to me) that it holds a lot of the complexity.

yeah -- maybe I am over sensitive as I feel like as soon as we are able to refactor away some of the complexity then it get all complicated again 😆

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

I am very wary of complicating the built in Parquet reader any more -- it is already very complicated with lots of behaviros (and new ones getting added all ghe time, for example the sortedness ones from @zhuqi-lucas and @xudong963 )

I agree it is a complex piece of software but I think we can continue to add the right abstractions and simplifications (like you recently did with the moralization work 😄 ). Ultimately the file reader is going to be a key piece of a data toolkit like DataFusion so it's unsurprising (to me) that it holds a lot of the complexity.

yeah -- maybe I am over sensitive as I feel like as soon as we are able to refactor away some of the complexity then it get all complicated again 😆

No you are right: it is a big risk that this code turns into feature spaghetti. It's just not one I think we can necessarily avoid. We should be cautious about introducing complexity and push back (like you have here) but if this is the right place to put it and we can factor it into a shape that only adds complexity, not multiplies or exponentiates it, then maybe we just need to deal with it over time.

@adriangb adriangb force-pushed the parquet-sampling branch 2 times, most recently from 66d2fc2 to 0661eba Compare May 4, 2026 21:04
Adds two opt-in sampling primitives to parquet scans, both built on
the existing `ParquetAccessPlan` infrastructure:

* `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction`
  of row groups in each scanned file. Selection is deferred until the
  opener has loaded the parquet footer (so we sample by real row-group
  index, not guess) and is deterministic per `(file_name,
  row_group_count, fraction)` via a seeded `SmallRng`.

* `ParquetSource::with_row_fraction(fraction)` — within each kept row
  group, keep `fraction` of rows by translating to a `RowSelection` of
  K small contiguous windows (size controlled by
  `with_row_cluster_size`, default 32 768 rows). The parquet reader
  uses the page index to read only the data pages covering the
  selected rows, so this gives "page-level" IO savings without
  requiring per-column page alignment. Falls back gracefully (no
  IO win, still correct) when the page index is missing.

The two layers compose: scanning with both `row_group_fraction=0.1`
and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row
groups, with windows spread out so the sample isn't clustered at one
end of each row group.

Selection within a row group is deterministic-but-random per
`(file_name, row_group_index, fraction, cluster_size)` — same inputs
yield the same windows, so re-runs are repeatable.

## Why this lives on `ParquetSource`

The natural entry-point for "I want a sample" is at config time,
before any metadata IO. The actual *which* row groups / *which* rows
selection still has to be deferred to the opener (after the footer is
parsed) — that's why `ParquetSampling` carries fractions plus a cluster
size, and the opener pulls them through to its lazy decision points.

This is intentionally orthogonal to file-level sampling: `ParquetSource`
doesn't own the file list (`FileScanConfig.file_groups` does), so a
file-fraction setter here would have been a confusing no-op. Callers
that want to drop files should rebuild the `FileScanConfig` directly.

## Use cases

* `TABLESAMPLE` SQL syntax (any future implementation can lower to
  these primitives).
* Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample.
* Mini-query-style stats sampling (a layered helper can call these
  to bound the cost of computing approximate min/max/NDV/histograms
  for the optimizer — out of scope here, see the linked POC in the
  PR description).
* `EXPLAIN ANALYZE`-driven debug runs against a representative slice.

## Tests

5 unit tests on `apply_row_group_sampling` (target count, determinism,
file-name dependence, no-op at fraction=1.0, target floor of 1) plus
2 end-to-end tests that build a real parquet file in `InMemory` object
store and confirm the row counts emitted are what the sampling implies.

`cargo build --workspace`, `cargo fmt --all`, and
`cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings`
are clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@adriangb adriangb force-pushed the parquet-sampling branch from 0661eba to 6ca69bf Compare May 4, 2026 22:01
@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 4, 2026

@alamb I've restructured some more and pulled out new code into sampling.rs so that the changes to opener.rs (aside from tests, imports, etc.) is ~ 10 LOC. I hope this helps 🙇🏻

2026412#diff-bbd611d7b35d7f17633eebbf32a07dc9e394f20135754ed949751e8030049e38

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 5, 2026

Regarding how other systems implement this: it seems most columnar, analytical DBs are approximate for the same reasons we are:

https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-sampling

image

https://docs.cloud.google.com/bigquery/docs/table-sampling

Screenshot 2026-05-04 at 7 19 59 PM

https://duckdb.org/docs/current/sql/samples

image

Similar for Spark and Trino.

I think it's also worth highlighting that DuckDB has the behavior built in.

In other words @alamb: if we merged 2026412 that's at least enough for someone like me to make it work in my system / in future DataFusion use cases. But I think it's valuable to take it one step further and expose this to users of datafusion-cli, etc.

Copy link
Copy Markdown
Contributor

@geoffreyclaude geoffreyclaude left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick pass and raised a few minor issues. Given most issues are with commits which build over the initial Parquet only change, I'd suggest extracting the first commit to a dedicated PR as that brings immediate value and can be reviewed and merged on it's own.
Keep this one open for context though.

pub(crate) system_target_remaining: Option<f64>,
/// Optional `REPEATABLE(seed)` value plumbed through from
/// `TABLESAMPLE`. When set, the row-group and row-fraction
/// samplers ignore the file path and key only off the seed +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reconsider ignoring the file path entirely for REPEATABLE? I agree we want to avoid environment specificities (as full paths) to be reproducible across environments, but keying only on (seed, row_group_index, fraction, cluster_size) means identical file layout will select the same row groups / row windows, creating cross-file correlation.

How about instead of full file path using the file index or some other stable id, so REPEATABLE stays reproducible without making files correlate?

)?);
)?;

// SYSTEM-mode adaptive split: when the SamplePushdown rule
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two comments have a lot of duplication. How about keeping only the large inner block for details?

let seed = hasher.finish();
let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);

let target_rows = ((total_rows as f64) * fraction).ceil().max(1.0) as usize;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems pretty risky and error-prone, especially with regards to possible duplicate sampling on edge cases. Can you extract it to a dedicated function and fuzz it to ensure there aren't any window overlaps, out of bounds, general weirdness?

Comment thread datafusion/sql/src/sample.rs Outdated
// The built-in planner only handles SYSTEM (and BLOCK as an
// alias for SYSTEM, matching Hive). Anything else is a
// semantics commitment we don't want to make in core.
Some(TableSampleMethod::System) | Some(TableSampleMethod::Block) | None => {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The None arm means the default if unspecified is SYSTEM: is this what we want?

Comment thread docs/source/user-guide/sql/select.md Outdated

`REPEATABLE(seed)` mixes the seed into every random draw, so all
levels produce the same selection across runs. The selection also
depends on the file name, the row-group index within the file, and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It currently actually doesn't depend on file name. See my related comment for a suggested dependency of file index instead.

Pushdown::Pushed(new_child) => Ok(Transformed::yes(new_child)),
Pushdown::Failed(reason) => {
datafusion_common::plan_err!(
"TABLESAMPLE is not supported for this source: {reason}. \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source here can be confusing if pushdown failed at an intermediary node. How about: "TABLESAMPLE could not be pushed down: {reason}"?

// through. Routed against the parquet-backed copy of the table so
// the `SamplePushdown` rule can absorb the sample into the scan.
// `REPEATABLE(42)` makes the rows deterministic across runs and
// across machines (the seed dominates the file path in the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment issue as in other places with REPEATABLE which currently ignores file path.

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 5, 2026

Thanks @geoffreyclaude ! I broke out #22024 and addressed the comments relevant to that bit there and cherry picked back to here.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 5, 2026

@alamb I've restructured some more and pulled out new code into sampling.rs so that the changes to opener.rs (aside from tests, imports, etc.) is ~ 10 LOC. I hope this helps 🙇🏻

Thank you

Basically I really want to get DataFusion to the top of ClickBench, so I am trying to focus on getting that done before adding more features (that isn't to say we shouldn't add more features, I am just giving my context)

@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 5, 2026

Thanks @geoffreyclaude ! I broke out #22024 and addressed the comments relevant to that bit there and cherry picked back to here.

Will review this later today

adriangb added a commit to pydantic/datafusion that referenced this pull request May 5, 2026
Two changes responding to review on the parent commit:

1. Key sampling on a stable `file_index` instead of `file_name`
   (apache#22000 (comment)).

   Both `apply_row_group_sampling` and `apply_row_fraction_sampling`
   now take `file_index: usize` rather than `file_name: &str`. The
   parquet opener passes the execution `partition_index`. This makes
   sampling reproducible across environments (no dependency on the
   on-disk path), while still decorrelating files assigned to
   different partitions.

2. Extract the row-window selection into `build_row_window_selectors`
   and add fuzz coverage
   (apache#22000 (comment)).

   The previous inline arithmetic could produce overlapping windows
   when `target_rows` was close to `total_rows`: `window_size =
   ceil(target / n_windows)` could exceed `stride = total / n_windows`,
   so adjacent strides' windows would intersect. The extracted
   function caps `window_size` at `stride` (the construction that
   guarantees disjointness) and is covered by:

   * `row_window_selection_basic_layout` — hand-checked anchor case.
   * `row_window_selection_returns_none_on_invalid_input` — degenerate
     inputs return `None` cleanly.
   * `row_window_selection_full_target_no_overlap` — the previously
     buggy `target_rows == total_rows` case.
   * `row_window_selection_fuzz_invariants` — 5 000 randomized
     `(total_rows, target_rows, cluster_size, seed)` configurations,
     asserting full coverage, in-bounds positions, and no overlap.
   * `row_window_selection_fuzz_determinism` — 1 000 iterations
     verifying identical seeds produce identical layouts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
adriangb added a commit to pydantic/datafusion that referenced this pull request May 5, 2026
Follow-ups to the cherry-picked refactor that landed the file_index
keying:

* Reject `TABLESAMPLE` without an explicit method instead of silently
  treating it as `SYSTEM`
  (apache#22000 (comment)).
  PostgreSQL requires an explicit method and Spark defaults to
  block-level; picking one here in core would commit to semantics
  callers may not want. Added an slt case to lock the new error.
* Rephrase the `SamplePushdown` planning error from "TABLESAMPLE is
  not supported for this source" to "TABLESAMPLE could not be pushed
  down" since the failure may originate at any node along the
  passthrough chain, not just the leaf source
  (apache#22000 (comment)).
  Updated the slt assertion to match.
* Dedupe the SYSTEM-mode adaptive split comments in the parquet
  opener; the outer block now covers determinism and the inner block
  covers the row-group-vs-row split math without overlap
  (apache#22000 (comment)).
* Update the `select.md` and `relation_planner/table_sample.rs`
  REPEATABLE wording to reflect that sampling now keys on the
  execution `partition_index`, not the on-disk file path
  (apache#22000 (comment)
   and #discussion_r3187445171).
* Replace the opener-level "REPEATABLE ignores file name" test with a
  "sampling keys on partition_index" test that verifies same
  partition_index → same selection regardless of file name and
  different partition_index → uncorrelated samples. Added
  `with_partition_index` to the test builder.
* Refresh the `run_examples-7` snapshot to match the new seed mix
  (the per-row-group hash now folds in the optional REPEATABLE seed
  alongside `file_index`; deterministic but a different draw).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
adriangb and others added 5 commits May 5, 2026 13:02
Adds the cross-cutting infrastructure for pushing TABLESAMPLE-shaped
sampling into file sources, with parquet as the first absorbing
source. There is no SQL surface yet; this commit only ships the
primitives. Wiring a RelationPlanner / ExtensionPlanner so it works
out of the box from SQL is the next commit in this stack.

- `Sample` `UserDefinedLogicalNodeCore` extension node in
  `datafusion-expr` (`logical_plan/sample.rs`). Schema-preserving;
  validates `fraction ∈ (0, 1]`. Currently encodes
  `SampleMethod::System` only.
- `SampleExec` placeholder in `datafusion-physical-plan`. Errors at
  `execute` (it's a marker — the `SamplePushdown` rule is expected
  to remove it). Implements filter / sort pushdown passthrough so
  unrelated optimizer rules see straight through it.
- New `try_push_sample` method on `ExecutionPlan` and `FileSource`,
  returning `Absorbed { inner }` / `Passthrough` / `Unsupported
  { reason }`. Default is `Unsupported`; per-node `Passthrough`
  overrides on filter, projection, coalesce_batches,
  coalesce_partitions, repartition, and non-fetch sort.
- `ParquetSource::try_push_sample` runs the (intentionally private)
  hierarchical block-level reduction across files / row groups /
  rows, with adaptive collapse when an axis can't reduce. Coordinates
  with the opener via `pub(crate)` `system_target_remaining` and
  `seed` fields on `ParquetSampling`. Single-file, single-row-group
  inputs hit ~p × N rows instead of undershooting at p^(1/3) × N.
- `REPEATABLE(seed)` is plumbed all the way through: when set,
  `ParquetSampling::apply_row_group_sampling` and
  `apply_row_fraction_sampling` key only on `(seed, ...)` and ignore
  the file path, so the same query is reproducible across
  environments.
- `SamplePushdown` optimizer rule (between `PushdownSort` and
  `EnsureCooperative`) walks top-down. On `Absorbed` it replaces
  `SampleExec` with the rebuilt source; on `Passthrough` it pushes
  through the single-child node and recurses; on `Unsupported` it
  errors at planning time with `"TABLESAMPLE is not supported for
  this source"`. There is intentionally no generic post-scan
  `SampleExec` yet.
- EXPLAIN visibility: `ParquetSource::fmt_extra` surfaces
  `sample_system_target_remaining` when set.
- `optimizer_rule_reference.md` updated to list `SamplePushdown` in
  the documented rule order.
- `explain.slt` updated with `physical_plan after SamplePushdown SAME
  TEXT AS ABOVE` lines under each verbose-explain test.

Tests: 7 unit tests on `ParquetSource::try_push_sample` covering the
pushdown contract (full / single-file / multi-file / target clamping
/ REPEATABLE determinism / multi-file rounding compensation), and 3
opener end-to-end tests covering the adaptive split for single vs
multi row group inputs and REPEATABLE-seed reproducibility across
file paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wires SQL `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the
infrastructure from the previous commit so it works on a default
`SessionContext` (and therefore in `datafusion-cli` and the
sqllogictest harness) without any extra registration.

- New `datafusion_sql::sample::TableSampleSystemPlanner`
  (`RelationPlanner`). Lifts `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]`
  to the core `Sample` extension node. Other forms (`BERNOULLI`,
  `ROW` count, `BUCKET ... OUT OF ...`, `OFFSET`) are rejected at
  planning time with errors that point at registering a custom
  `RelationPlanner` ahead of this one.
- New public `SamplePhysicalPlanner` (`ExtensionPlanner`) in
  `datafusion::physical_planner`. Lowers `Sample` to `SampleExec`.
  Pre-registered in `DefaultPhysicalPlanner::default()` so the
  default query planner handles it.
- `SessionStateDefaults::default_relation_planners()` returns the
  built-in planner; `SessionStateBuilder::with_default_features()`
  installs it. Both gated behind the `sql` feature so
  `--no-default-features` builds keep working.
- `register_relation_planner` already prepends to the chain, so any
  user-supplied planner runs first and can return `Original` to fall
  through to the built-in for SYSTEM. That composition is the
  intended override mechanism.

End-to-end coverage:

- New `datafusion/sqllogictest/test_files/tablesample.slt` exercises
  the path a user gets out of the box: `SYSTEM(100)`, `SYSTEM(50)
  REPEATABLE(42)` deterministic count, EXPLAIN absorbed into
  ParquetSource, every rejected form, and the planning-time error
  for sources that don't implement `try_push_sample` (CSV).

Docs:

- `docs/source/user-guide/sql/select.md` gains a `TABLESAMPLE clause`
  section explaining what it is, the SYSTEM vs BERNOULLI tradeoff,
  the parquet implementation strategy, deterministic seeds, the
  EXPLAIN format, and the list of rejected forms.
- `docs/source/library-user-guide/extending-sql.md` reframes the
  existing TABLESAMPLE example as the way to add additional flavours
  on top of the built-in SYSTEM planner.
- `datafusion-examples/examples/relation_planner/main.rs` carries a
  matching note in its module docs.
- `datafusion-examples/README.md` regenerated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The relation_planner/table_sample.rs example previously errored on
TABLESAMPLE methods other than BERNOULLI / ROW. After commit 3 of
this stack registers a built-in TableSampleSystemPlanner by default,
the example's planner runs *first* in the chain, so its error
short-circuits SYSTEM and prevents the built-in from handling it.

Reshape the example to use the chain composition pattern instead:

- The planner now returns `RelationPlanning::Original` for methods it
  doesn't implement (SYSTEM, BLOCK, anything else), so the built-in
  picks up the slack. **No SYSTEM reimplementation in the example.**
- The example's `TableSampleQueryPlanner` registers both
  `TableSampleExtensionPlanner` (this example's `TableSamplePlanNode`
  → its own `SampleExec`) **and** `SamplePhysicalPlanner` (core
  `Sample` → core `SampleExec`), since
  `with_extension_planners(...)` replaces the defaults.
- Adds a parquet-backed `sample_data_parquet` table so SYSTEM has a
  source it can push into (the existing in-memory `sample_data`
  doesn't implement `try_push_sample`).
- New Example 7 demonstrates `TABLESAMPLE SYSTEM (50) REPEATABLE
  (42)` against the parquet table; the row count is asserted (the
  exact rows shift with the tempdir path that gets hashed into the
  per-file seed, but the count from the cube-root math is stable).
- Module docstring rewritten to lead with the composition story.

The 3 existing relation_planner example tests (match_recognize,
pivot_unpivot, table_sample) still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes responding to review on the parent commit:

1. Key sampling on a stable `file_index` instead of `file_name`
   (apache#22000 (comment)).

   Both `apply_row_group_sampling` and `apply_row_fraction_sampling`
   now take `file_index: usize` rather than `file_name: &str`. The
   parquet opener passes the execution `partition_index`. This makes
   sampling reproducible across environments (no dependency on the
   on-disk path), while still decorrelating files assigned to
   different partitions.

2. Extract the row-window selection into `build_row_window_selectors`
   and add fuzz coverage
   (apache#22000 (comment)).

   The previous inline arithmetic could produce overlapping windows
   when `target_rows` was close to `total_rows`: `window_size =
   ceil(target / n_windows)` could exceed `stride = total / n_windows`,
   so adjacent strides' windows would intersect. The extracted
   function caps `window_size` at `stride` (the construction that
   guarantees disjointness) and is covered by:

   * `row_window_selection_basic_layout` — hand-checked anchor case.
   * `row_window_selection_returns_none_on_invalid_input` — degenerate
     inputs return `None` cleanly.
   * `row_window_selection_full_target_no_overlap` — the previously
     buggy `target_rows == total_rows` case.
   * `row_window_selection_fuzz_invariants` — 5 000 randomized
     `(total_rows, target_rows, cluster_size, seed)` configurations,
     asserting full coverage, in-bounds positions, and no overlap.
   * `row_window_selection_fuzz_determinism` — 1 000 iterations
     verifying identical seeds produce identical layouts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-ups to the cherry-picked refactor that landed the file_index
keying:

* Reject `TABLESAMPLE` without an explicit method instead of silently
  treating it as `SYSTEM`
  (apache#22000 (comment)).
  PostgreSQL requires an explicit method and Spark defaults to
  block-level; picking one here in core would commit to semantics
  callers may not want. Added an slt case to lock the new error.
* Rephrase the `SamplePushdown` planning error from "TABLESAMPLE is
  not supported for this source" to "TABLESAMPLE could not be pushed
  down" since the failure may originate at any node along the
  passthrough chain, not just the leaf source
  (apache#22000 (comment)).
  Updated the slt assertion to match.
* Dedupe the SYSTEM-mode adaptive split comments in the parquet
  opener; the outer block now covers determinism and the inner block
  covers the row-group-vs-row split math without overlap
  (apache#22000 (comment)).
* Update the `select.md` and `relation_planner/table_sample.rs`
  REPEATABLE wording to reflect that sampling now keys on the
  execution `partition_index`, not the on-disk file path
  (apache#22000 (comment)
   and #discussion_r3187445171).
* Replace the opener-level "REPEATABLE ignores file name" test with a
  "sampling keys on partition_index" test that verifies same
  partition_index → same selection regardless of file name and
  different partition_index → uncorrelated samples. Added
  `with_partition_index` to the test builder.
* Refresh the `run_examples-7` snapshot to match the new seed mix
  (the per-row-group hash now folds in the optional REPEATABLE seed
  alongside `file_index`; deterministic but a different draw).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@adriangb adriangb force-pushed the parquet-sampling branch from f3fb436 to fa5e9d0 Compare May 5, 2026 18:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation logical-expr Logical plan and expressions optimizer Optimizer rules physical-plan Changes to the physical-plan crate sql SQL Planner sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature: Support Sample

3 participants