Skip to content

Commit ea101ed

Browse files
adriangbclaude
andcommitted
catalog: query-aware statistics requests via ScanArgs / ScanResult
Adds an opt-in handshake that lets callers ask a `TableProvider` for specific stats by name and receive only what the provider can answer cheaply, instead of the all-or-nothing dense `Statistics` we have today. ## What's new * `datafusion-common::stats::StatisticsRequest` — enum of stat kinds that mirror `Statistics` / `ColumnStatistics` (Min, Max, NullCount, DistinctCount, Sum, ByteSize, RowCount, TotalByteSize). `Hash + Eq` so it can key a `HashMap`. * `datafusion-common::stats::StatisticsValue` — `Scalar(Precision<...>) | Distribution(Arc<dyn Any>) | Sketch(Arc<dyn Any>) | Absent`. Whether a value is exact or estimated travels in the `Precision` wrapper, not the variant. * `ScanArgs::with_statistics_requests` / `statistics_requests()` — the caller's question. * `ScanResult::with_statistics` / `statistics()` / `into_parts()` — the provider's answer, paired 1:1 with the requests slice. * `PartitionedFile::satisfied_stats` — sparse, `Arc<HashMap<StatisticsRequest, StatisticsValue>>` for per-file answers. Memory scales with what was asked, not with table width. Providers that store stats out-of-band (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs) can populate this directly without rebuilding a full dense `Statistics`. * `FilePruner` learns to consume the sparse map. Internally, `file_stats_pruning` is now `Box<dyn PruningStatistics + Send + Sync>` so we can dispatch between the existing `PrunableStatistics` (dense) and a new `SparseFilePruningStats` adapter (sparse). The sparse adapter looks up each `StatisticsRequest` directly in the map and materializes single-row arrays only for the columns the pruning predicate touches — no densify-then-throw-away. * `ListingTable::scan_with_args` populates `ScanResult.statistics` from the merged dense `Statistics` it already computed when `args.statistics_requests()` is set and `collect_statistics=true`. When `collect_statistics=false` it returns `Absent` for everything (the contract is "answer what's free"). `DistinctCount`/`Sum`/ `ByteSize` are likewise `Absent` for parquet — those aren't in thrift footers; layered helpers (or richer providers) can fill the gaps. ## Backwards compat All additions are opt-in: * `ScanArgs` / `ScanResult` gain new fields with `Default`-friendly initializers; existing callers that don't use the new builders see no change. * `FilePruner`'s field-type change is internal (private field). * The only minor source-level break is a new pub field on `PartitionedFile` (`satisfied_stats`). Callers using `PartitionedFile::new` / `From<ObjectMeta>` / the existing builders are unaffected. Direct struct literals — uncommon, none in-tree — need to add `satisfied_stats: None` (or use the new `with_satisfied_stats` builder). ## Tests * `datafusion-common::stats::tests::statistics_request_is_hashable_keyable` — round-trip a `StatisticsRequest` through a `HashMap`. * `datafusion-pruning::file_pruner::tests` — three tests demonstrating end-to-end pruning against a sparse-only `PartitionedFile` (`x > 100` prunes a `[10, 20]` file, `x > 15` doesn't, no stats at all → no pruner). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d648982 commit ea101ed

5 files changed

Lines changed: 548 additions & 13 deletions

File tree

datafusion/catalog-listing/src/table.rs

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use crate::{ListingOptions, ListingTableConfig};
2121
use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
2222
use async_trait::async_trait;
2323
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24-
use datafusion_common::stats::Precision;
24+
use datafusion_common::stats::{Precision, StatisticsRequest, StatisticsValue};
2525
use datafusion_common::{
26-
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
26+
Constraints, ScalarValue, SchemaExt, Statistics, internal_datafusion_err, plan_err,
27+
project_schema,
2728
};
2829
use datafusion_datasource::file::FileSource;
2930
use datafusion_datasource::file_groups::FileGroup;
@@ -515,6 +516,15 @@ impl TableProvider for ListingTable {
515516
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
516517
.await?;
517518

519+
// Snapshot the merged stats if the caller asked for cheap-stats
520+
// answers — `statistics` is moved into the FileScanConfig builder
521+
// below, so we need a snapshot to read from afterwards. Skipped
522+
// entirely when no `statistics_requests` were passed.
523+
let stats_snapshot_for_requests = args
524+
.statistics_requests()
525+
.filter(|_| self.options.collect_stat)
526+
.map(|_| statistics.clone());
527+
518528
// if no files need to be read, return an `EmptyExec`
519529
if partitioned_file_lists.is_empty() {
520530
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
@@ -583,7 +593,24 @@ impl TableProvider for ListingTable {
583593
)
584594
.await?;
585595

586-
Ok(ScanResult::new(plan))
596+
// Answer any requested stats from the table-level metadata we
597+
// already touched. Anything not derivable from the dense
598+
// `Statistics` we computed comes back as `Absent`. Skipped
599+
// entirely when the caller didn't ask. We also skip when
600+
// `collect_statistics=false` — the contract is "answer what's
601+
// free", and computing stats here just to populate this map
602+
// would violate that.
603+
let stats_responses = match (
604+
args.statistics_requests(),
605+
stats_snapshot_for_requests.as_ref(),
606+
) {
607+
(Some(reqs), Some(stats)) => {
608+
answer_statistics_requests(reqs, &self.table_schema, stats)
609+
}
610+
_ => Vec::new(),
611+
};
612+
613+
Ok(ScanResult::new(plan).with_statistics(stats_responses))
587614
}
588615

589616
fn supports_filters_pushdown(
@@ -688,6 +715,86 @@ impl TableProvider for ListingTable {
688715
}
689716
}
690717

718+
/// Answer a batch of [`StatisticsRequest`]s from a pre-computed
719+
/// table-level [`Statistics`]. Used by `ListingTable::scan_with_args`
720+
/// when the caller passes `args.with_statistics_requests(...)`.
721+
///
722+
/// Returns a vector aligned 1:1 with `requests`. Anything not derivable
723+
/// from the merged dense `Statistics` (typically: `DistinctCount`,
724+
/// `Sum`, `ByteSize`, since parquet thrift footers don't carry those)
725+
/// returns [`StatisticsValue::Absent`]; the caller decides whether to
726+
/// fill the gaps via another mechanism.
727+
fn answer_statistics_requests(
728+
requests: &[StatisticsRequest],
729+
schema: &SchemaRef,
730+
stats: &Statistics,
731+
) -> Vec<StatisticsValue> {
732+
let mut out = Vec::with_capacity(requests.len());
733+
for req in requests {
734+
out.push(match req {
735+
StatisticsRequest::RowCount => {
736+
precision_to_scalar(&stats.num_rows, |n| ScalarValue::UInt64(Some(n as u64)))
737+
}
738+
StatisticsRequest::TotalByteSize => precision_to_scalar(
739+
&stats.total_byte_size,
740+
|n| ScalarValue::UInt64(Some(n as u64)),
741+
),
742+
StatisticsRequest::Min(col) => column_scalar(schema, stats, &col.name, |cs| &cs.min_value),
743+
StatisticsRequest::Max(col) => column_scalar(schema, stats, &col.name, |cs| &cs.max_value),
744+
StatisticsRequest::Sum(col) => column_scalar(schema, stats, &col.name, |cs| &cs.sum_value),
745+
StatisticsRequest::NullCount(col) => column_usize(schema, stats, &col.name, |cs| &cs.null_count),
746+
StatisticsRequest::DistinctCount(col) => column_usize(schema, stats, &col.name, |cs| &cs.distinct_count),
747+
StatisticsRequest::ByteSize(col) => column_usize(schema, stats, &col.name, |cs| &cs.byte_size),
748+
});
749+
}
750+
out
751+
}
752+
753+
fn precision_to_scalar(
754+
p: &Precision<usize>,
755+
map: impl Fn(usize) -> ScalarValue,
756+
) -> StatisticsValue {
757+
match p {
758+
Precision::Exact(n) => StatisticsValue::Scalar(Precision::Exact(map(*n))),
759+
Precision::Inexact(n) => StatisticsValue::Scalar(Precision::Inexact(map(*n))),
760+
Precision::Absent => StatisticsValue::Absent,
761+
}
762+
}
763+
764+
fn column_scalar(
765+
schema: &SchemaRef,
766+
stats: &Statistics,
767+
name: &str,
768+
pick: impl Fn(&datafusion_common::ColumnStatistics) -> &Precision<ScalarValue>,
769+
) -> StatisticsValue {
770+
let Ok(idx) = schema.index_of(name) else {
771+
return StatisticsValue::Absent;
772+
};
773+
let Some(cs) = stats.column_statistics.get(idx) else {
774+
return StatisticsValue::Absent;
775+
};
776+
match pick(cs) {
777+
Precision::Exact(v) => StatisticsValue::Scalar(Precision::Exact(v.clone())),
778+
Precision::Inexact(v) => StatisticsValue::Scalar(Precision::Inexact(v.clone())),
779+
Precision::Absent => StatisticsValue::Absent,
780+
}
781+
}
782+
783+
fn column_usize(
784+
schema: &SchemaRef,
785+
stats: &Statistics,
786+
name: &str,
787+
pick: impl Fn(&datafusion_common::ColumnStatistics) -> &Precision<usize>,
788+
) -> StatisticsValue {
789+
let Ok(idx) = schema.index_of(name) else {
790+
return StatisticsValue::Absent;
791+
};
792+
let Some(cs) = stats.column_statistics.get(idx) else {
793+
return StatisticsValue::Absent;
794+
};
795+
precision_to_scalar(pick(cs), |n| ScalarValue::UInt64(Some(n as u64)))
796+
}
797+
691798
impl ListingTable {
692799
/// Get the list of files for a scan as well as the file level statistics.
693800
/// The list is grouped to let the execution plan know how the files should

datafusion/catalog/src/table.rs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::sync::Arc;
2323
use crate::session::Session;
2424
use arrow::datatypes::SchemaRef;
2525
use async_trait::async_trait;
26+
use datafusion_common::stats::{StatisticsRequest, StatisticsValue};
2627
use datafusion_common::{Constraints, Statistics, not_impl_err};
2728
use datafusion_common::{Result, internal_err};
2829
use datafusion_expr::Expr;
@@ -406,6 +407,7 @@ pub struct ScanArgs<'a> {
406407
filters: Option<&'a [Expr]>,
407408
projection: Option<&'a [usize]>,
408409
limit: Option<usize>,
410+
statistics_requests: Option<&'a [StatisticsRequest]>,
409411
}
410412

411413
impl<'a> ScanArgs<'a> {
@@ -467,22 +469,68 @@ impl<'a> ScanArgs<'a> {
467469
pub fn limit(&self) -> Option<usize> {
468470
self.limit
469471
}
472+
473+
/// Set a list of statistics the caller would like the provider to
474+
/// answer if it can do so cheaply.
475+
///
476+
/// Typical sources a provider may answer from:
477+
/// * Parquet thrift footers (Min/Max/NullCount/RowCount, exact)
478+
/// * An external metadata catalog (Delta/Iceberg/Hudi manifests,
479+
/// Hive-Metastore-style stats columns)
480+
/// * Cached / materialized stats columns
481+
///
482+
/// The provider returns its answers on
483+
/// [`ScanResult::statistics`] paired 1:1 with `requests`. Anything
484+
/// not answerable should come back as [`StatisticsValue::Absent`] —
485+
/// the caller decides what to do with the gaps. The contract is
486+
/// "answer what's free, leave the rest as `Absent`": providers MUST
487+
/// NOT do expensive scans purely to satisfy these requests.
488+
pub fn with_statistics_requests(
489+
mut self,
490+
requests: Option<&'a [StatisticsRequest]>,
491+
) -> Self {
492+
self.statistics_requests = requests;
493+
self
494+
}
495+
496+
/// Get the statistics requests, if any.
497+
pub fn statistics_requests(&self) -> Option<&'a [StatisticsRequest]> {
498+
self.statistics_requests
499+
}
470500
}
471501

472502
/// Result of a table scan operation from [`TableProvider::scan_with_args`].
473503
#[derive(Debug, Clone)]
474504
pub struct ScanResult {
475505
/// The ExecutionPlan to run.
476506
plan: Arc<dyn ExecutionPlan>,
507+
/// Responses paired 1:1 with `ScanArgs::statistics_requests`. Empty
508+
/// if no requests were made or the provider declined to answer any.
509+
statistics: Vec<StatisticsValue>,
477510
}
478511

479512
impl ScanResult {
480-
/// Create a new `ScanResult` with the given execution plan.
513+
/// Create a new `ScanResult` with the given execution plan and no
514+
/// statistics responses.
481515
///
482516
/// # Arguments
483517
/// * `plan` - The execution plan that will perform the table scan
484518
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
485-
Self { plan }
519+
Self {
520+
plan,
521+
statistics: Vec::new(),
522+
}
523+
}
524+
525+
/// Attach statistics responses to this result.
526+
///
527+
/// The vector MUST have the same length and ordering as the
528+
/// `ScanArgs::statistics_requests` slice that was passed in. Use
529+
/// [`StatisticsValue::Absent`] for individual requests the provider
530+
/// can't answer; the caller pairs requests with responses by index.
531+
pub fn with_statistics(mut self, statistics: Vec<StatisticsValue>) -> Self {
532+
self.statistics = statistics;
533+
self
486534
}
487535

488536
/// Get a reference to the execution plan for this scan result.
@@ -493,13 +541,25 @@ impl ScanResult {
493541
&self.plan
494542
}
495543

544+
/// Get the statistics responses, paired with the requests sent in
545+
/// via [`ScanArgs::with_statistics_requests`]. Empty if no requests
546+
/// were made.
547+
pub fn statistics(&self) -> &[StatisticsValue] {
548+
&self.statistics
549+
}
550+
496551
/// Consume this ScanResult and return the execution plan.
497552
///
498553
/// Returns the owned [`ExecutionPlan`] that will perform
499554
/// the actual table scanning and data retrieval.
500555
pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
501556
self.plan
502557
}
558+
559+
/// Consume this ScanResult and return the (plan, statistics) pair.
560+
pub fn into_parts(self) -> (Arc<dyn ExecutionPlan>, Vec<StatisticsValue>) {
561+
(self.plan, self.statistics)
562+
}
503563
}
504564

505565
impl From<Arc<dyn ExecutionPlan>> for ScanResult {

datafusion/common/src/stats.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,13 +1172,122 @@ impl ColumnStatistics {
11721172
}
11731173
}
11741174

1175+
// ---------------------------------------------------------------------------
1176+
// Query-aware statistics request / response.
1177+
//
1178+
// A small extension to the existing `Statistics` model: instead of "give me
1179+
// everything you have for every column", a caller can ask for a specific
1180+
// list of stats by name. Providers that have something cheap to offer
1181+
// (parquet thrift footers, an external catalog, cached metadata) answer
1182+
// the entries they can; everything else comes back `Absent`. Callers
1183+
// (optimizer rules, layered helpers, etc.) decide what to do with the
1184+
// gaps.
1185+
//
1186+
// See `TableProvider::scan_with_args` (`ScanArgs::with_statistics_requests`
1187+
// / `ScanResult::statistics`) for the table-level handshake, and
1188+
// `PartitionedFile::satisfied_stats` for the per-file one.
1189+
// ---------------------------------------------------------------------------
1190+
1191+
/// What stat does the caller want?
1192+
///
1193+
/// Each variant maps onto a field of [`Statistics`] / [`ColumnStatistics`]
1194+
/// so providers that already populate one can answer the other trivially.
1195+
/// The companion [`StatisticsValue`] is paired 1:1 with the request in the
1196+
/// response. Whether a value is exact or estimated is encoded in the
1197+
/// returned [`Precision`] wrapper, not in the request kind itself —
1198+
/// `DistinctCount` covers both an exact distinct count from a metadata
1199+
/// catalog and an HLL-style estimate from a sampled scan.
1200+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1201+
pub enum StatisticsRequest {
1202+
/// Smallest non-null value of `column`. Mirrors [`ColumnStatistics::min_value`].
1203+
Min(crate::Column),
1204+
/// Largest non-null value of `column`. Mirrors [`ColumnStatistics::max_value`].
1205+
Max(crate::Column),
1206+
/// Number of NULLs in `column`. Mirrors [`ColumnStatistics::null_count`].
1207+
NullCount(crate::Column),
1208+
/// Number of distinct values in `column` (exact or estimated).
1209+
/// Mirrors [`ColumnStatistics::distinct_count`].
1210+
DistinctCount(crate::Column),
1211+
/// Sum of values in `column` (numerics, widened per
1212+
/// [`ColumnStatistics::sum_value`]).
1213+
Sum(crate::Column),
1214+
/// Encoded/output byte size of `column`. Mirrors
1215+
/// [`ColumnStatistics::byte_size`].
1216+
ByteSize(crate::Column),
1217+
/// Number of rows in the container (table / file). Mirrors
1218+
/// [`Statistics::num_rows`].
1219+
RowCount,
1220+
/// Total byte size of the container's output. Mirrors
1221+
/// [`Statistics::total_byte_size`].
1222+
TotalByteSize,
1223+
}
1224+
1225+
/// Response value paired 1:1 with an inbound [`StatisticsRequest`].
1226+
///
1227+
/// Variants are intentionally schema-agnostic: a provider answering
1228+
/// `Min(c)` returns `Scalar(Precision::Exact(ScalarValue::...))` with
1229+
/// the column's natural type; `RowCount` / `NullCount` /
1230+
/// `ApproxDistinct` return `Scalar(Precision::*(ScalarValue::UInt64(...)))`.
1231+
///
1232+
/// `Distribution` and `Sketch` are reserved for future use (histograms,
1233+
/// theta/HLL sketches, raw sketch state for downstream merging) — the
1234+
/// optimizer doesn't consume them today, but the type carries an opaque
1235+
/// payload so providers can populate them ahead of optimizer support.
1236+
#[derive(Debug, Clone)]
1237+
pub enum StatisticsValue {
1238+
/// A single scalar value.
1239+
Scalar(Precision<ScalarValue>),
1240+
/// A distribution / histogram. Carries an opaque payload for now.
1241+
Distribution(std::sync::Arc<dyn std::any::Any + Send + Sync>),
1242+
/// A sketch (HLL, theta, KLL, …). Carries an opaque payload for now.
1243+
Sketch(std::sync::Arc<dyn std::any::Any + Send + Sync>),
1244+
/// Provider can't (or won't) answer this request. The caller decides
1245+
/// whether to fall back to another mechanism (re-route to a different
1246+
/// stats source, run a sampled scan, give up, …).
1247+
Absent,
1248+
}
1249+
1250+
impl StatisticsValue {
1251+
/// Convenience: an `Exact` scalar response.
1252+
pub fn exact(value: ScalarValue) -> Self {
1253+
Self::Scalar(Precision::Exact(value))
1254+
}
1255+
/// Convenience: an `Inexact` scalar response.
1256+
pub fn inexact(value: ScalarValue) -> Self {
1257+
Self::Scalar(Precision::Inexact(value))
1258+
}
1259+
}
1260+
11751261
#[cfg(test)]
11761262
mod tests {
11771263
use super::*;
11781264
use crate::assert_contains;
11791265
use arrow::datatypes::Field;
11801266
use std::sync::Arc;
11811267

1268+
#[test]
1269+
fn statistics_request_is_hashable_keyable() {
1270+
// Sanity: two equal `StatisticsRequest`s hash equal and round-trip
1271+
// through a HashMap, so they can be used as keys (e.g. for the
1272+
// sparse `PartitionedFile::satisfied_stats` map).
1273+
use crate::Column;
1274+
use std::collections::HashMap;
1275+
let r1 = StatisticsRequest::Min(Column::new_unqualified("c"));
1276+
let r2 = StatisticsRequest::Min(Column::new_unqualified("c"));
1277+
assert_eq!(r1, r2);
1278+
let mut map: HashMap<StatisticsRequest, StatisticsValue> = HashMap::new();
1279+
map.insert(
1280+
r1.clone(),
1281+
StatisticsValue::exact(ScalarValue::Int64(Some(7))),
1282+
);
1283+
match map.get(&r2) {
1284+
Some(StatisticsValue::Scalar(Precision::Exact(ScalarValue::Int64(
1285+
Some(7),
1286+
)))) => {}
1287+
other => panic!("unexpected lookup: {other:?}"),
1288+
}
1289+
}
1290+
11821291
#[test]
11831292
fn test_get_value() {
11841293
let exact_precision = Precision::Exact(42);

0 commit comments

Comments
 (0)