Skip to content

Commit c850574

Browse files
fix(datafusion): address review findings
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name so time-range predicates are correctly classified as Inexact and passed to scan(); previously CAST-wrapped filters were silently dropped - fix: declare parquet sort order (metric_name, timestamp_secs ASC) on FileScanConfig so DataFusion avoids redundant sort operators - fix: get_opts now respects GetOptions.range — dispatches to get_slice for Bounded/Suffix ranges instead of always downloading the full file - fix: to_object_store_error propagates file path on NotFound - fix: register_for_worker made a no-op; lazy scan-path registration is sufficient and avoids O(indexes) metastore RPCs per worker task; removes stale comment claiming a non-existent object-store cache - fix: extract is_index_not_found helper, removing duplicated downcast block from try_consume_read_rel and create_default_table_provider - fix: sort before dedup in QuickwitSchemaProvider::table_names - fix: empty searcher pool returns Ok(vec![]) for local execution fallback - fix: remove dead builder methods with_udf_batch, with_codec_applier, with_physical_optimizer_rule from DataSourceContributions - feat: add tracing spans to execute_substrait and execute_sql - feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve - refactor: extract stream_to_receiver helper in gRPC handler Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 0b3831b commit c850574

21 files changed

Lines changed: 431 additions & 516 deletions

File tree

quickwit/Cargo.lock

Lines changed: 38 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ datafusion-datasource = "52"
4040
datafusion-sql = "52"
4141
datafusion-physical-plan = "52"
4242
datafusion-datasource-parquet = "52"
43-
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed" }
43+
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", rev = "0f2c8be3e148b0bd5c7f17b23f2df8bb1201d5fb" }
4444
object_store = "0.12"
4545

4646
[dev-dependencies]

quickwit/quickwit-datafusion/src/catalog.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ impl SchemaProvider for QuickwitSchemaProvider {
8888
names.append(&mut source_names);
8989
}
9090
}
91-
// Deduplicate in case multiple sources claim the same name.
91+
// Sort then deduplicate in case multiple sources claim the same name.
92+
// `dedup()` only removes consecutive duplicates, so sorting first
93+
// is required to remove all duplicates.
94+
names.sort();
9295
names.dedup();
9396
names
9497
})

quickwit/quickwit-datafusion/src/data_source.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -121,38 +121,12 @@ impl Default for DataSourceContributions {
121121
}
122122

123123
impl DataSourceContributions {
124-
/// Add a physical optimizer rule.
125-
pub fn with_physical_optimizer_rule(
126-
mut self,
127-
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
128-
) -> Self {
129-
self.physical_optimizer_rules.push(rule);
130-
self
131-
}
132-
133124
/// Add a scalar UDF.
134125
pub fn with_udf(mut self, udf: Arc<ScalarUDF>) -> Self {
135126
self.udfs.push(udf);
136127
self
137128
}
138129

139-
/// Add multiple scalar UDFs at once.
140-
pub fn with_udf_batch(mut self, udfs: impl IntoIterator<Item = Arc<ScalarUDF>>) -> Self {
141-
self.udfs.extend(udfs);
142-
self
143-
}
144-
145-
/// Add a codec / builder-extension callback.
146-
///
147-
/// Logs uses this to call `.with_distributed_user_codec(TantivyCodec)`.
148-
pub fn with_codec_applier(
149-
mut self,
150-
f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder + Send + Sync + 'static,
151-
) -> Self {
152-
self.codec_appliers.push(Box::new(f));
153-
self
154-
}
155-
156130
pub(crate) fn udf_names(&self) -> Vec<String> {
157131
self.udfs.iter().map(|udf| udf.name().to_string()).collect()
158132
}

quickwit/quickwit-datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub(crate) mod substrait;
2323
pub(crate) mod task_estimator;
2424
pub(crate) mod worker;
2525

26+
pub use datafusion::execution::SendableRecordBatchStream;
2627
pub use resolver::QuickwitWorkerResolver;
2728
pub use service::DataFusionService;
2829
pub use session::DataFusionSessionBuilder;

quickwit/quickwit-datafusion/src/resolver.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ impl WorkerResolver for QuickwitWorkerResolver {
5151
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
5252
let addrs: Vec<SocketAddr> = self.searcher_pool.keys();
5353
if addrs.is_empty() {
54-
return Err(DataFusionError::Execution(
55-
"no searcher nodes available in the cluster".to_string(),
56-
));
54+
// Empty pool means no searcher workers are registered (e.g. single-node
55+
// local execution). Return an empty list so the distributed optimizer
56+
// sees zero workers and falls back to local execution rather than
57+
// treating it as a hard error.
58+
return Ok(vec![]);
5759
}
5860
let scheme = if self.use_tls { "https" } else { "http" };
5961
addrs

quickwit/quickwit-datafusion/src/service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,12 @@ impl DataFusionService {
7777
/// decodes the plan, and returns a streaming `RecordBatch` iterator.
7878
/// The caller decides whether to collect, send via gRPC, or pipe to Arrow
7979
/// Flight — no materialization happens inside this method.
80+
#[tracing::instrument(skip(self, plan_bytes), fields(plan_bytes_len = plan_bytes.len()))]
8081
pub async fn execute_substrait(
8182
&self,
8283
plan_bytes: &[u8],
8384
) -> DFResult<SendableRecordBatchStream> {
85+
tracing::info!(plan_bytes_len = plan_bytes.len(), "executing substrait plan");
8486
use datafusion_substrait::substrait::proto::Plan;
8587
use prost::Message;
8688

@@ -128,7 +130,9 @@ impl DataFusionService {
128130
///
129131
/// Returns an error if `sql` is empty after splitting, or if any statement
130132
/// fails to parse or execute.
133+
#[tracing::instrument(skip(self, sql), fields(sql_len = sql.len()))]
131134
pub async fn execute_sql(&self, sql: &str) -> DFResult<SendableRecordBatchStream> {
135+
tracing::info!(sql_len = sql.len(), "executing SQL query");
132136
let ctx = self.builder.build_session()?;
133137

134138
// Split on `;` and discard empty fragments (trailing `;` etc.).

quickwit/quickwit-datafusion/src/session.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,11 @@
3636
//! variant), use `with_worker_resolver(resolver)` to supply any type that
3737
//! implements `datafusion_distributed::WorkerResolver`.
3838
//!
39-
//! ## Result materialization
39+
//! ## Result streaming
4040
//!
41-
//! `execute_substrait` collects all result batches into memory before returning.
42-
//! For large rollup queries this is unsuitable for production use. A streaming
43-
//! variant is deferred; A downstream caller can wrap this via its own gRPC handler.
44-
//! Use `with_memory_limit()` to bound memory usage until streaming is in place.
41+
//! `execute_substrait` returns a `SendableRecordBatchStream` backed by the
42+
//! DataFusion streaming executor — no intermediate materialisation occurs.
43+
//! Use `with_memory_limit()` to bound query memory for large rollup queries.
4544
4645
use std::collections::HashSet;
4746
use std::sync::Arc;
@@ -193,31 +192,31 @@ impl DataFusionSessionBuilder {
193192
Ok(())
194193
}
195194

196-
/// Execute a Substrait plan (protobuf bytes) and return the results.
195+
/// Execute a Substrait plan (protobuf bytes) and return a streaming result.
197196
///
198197
/// Builds a fresh session, converts the plan via `QuickwitSubstraitConsumer`,
199-
/// and collects all results into memory. See the module-level doc on
200-
/// materialization limits.
198+
/// and returns a `SendableRecordBatchStream` that the caller can consume
199+
/// incrementally — no intermediate materialization occurs here.
201200
pub async fn execute_substrait(
202201
&self,
203202
plan_bytes: &[u8],
204-
) -> DFResult<Vec<arrow::array::RecordBatch>> {
203+
) -> DFResult<datafusion::physical_plan::SendableRecordBatchStream> {
205204
use datafusion_substrait::substrait::proto::Plan;
206205
use prost::Message;
207206

208207
let plan = Plan::decode(plan_bytes)
209208
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
210209

211210
let ctx = self.build_session()?;
212-
crate::substrait::execute_substrait_plan(&plan, &ctx, &self.sources).await
211+
crate::substrait::execute_substrait_plan_streaming(&plan, &ctx, &self.sources).await
213212
}
214213

215214
/// Build a `SessionContext` backed by the shared `RuntimeEnv`.
216215
///
217216
/// Does NOT call `check_invariants()` — callers should invoke that once at
218217
/// startup, not on every query.
219218
pub fn build_session(&self) -> DFResult<SessionContext> {
220-
let mut config = SessionConfig::new().with_target_partitions(1);
219+
let mut config = SessionConfig::new();
221220
config.options_mut().catalog.default_catalog = "quickwit".to_string();
222221
config.options_mut().catalog.default_schema = "public".to_string();
223222
config.options_mut().catalog.information_schema = true;
@@ -231,23 +230,27 @@ impl DataFusionSessionBuilder {
231230
// at startup (via init) or lazily (via scan) are globally visible.
232231
.with_runtime_env(Arc::clone(&self.runtime));
233232

233+
// Accumulate contributions from all sources and apply them first, so that
234+
// source-specific physical optimizer rules (e.g. tantivy pushdown) run
235+
// before the distributed rule inspects the fully-optimized plan.
236+
let mut combined = crate::data_source::DataSourceContributions::default();
237+
for source in &self.sources {
238+
combined.merge(source.contributions());
239+
}
240+
builder = combined.apply_to_builder(builder);
241+
234242
if let Some(resolver) = &self.worker_resolver {
235243
// Clone the Arc so ownership passes into the distributed extension.
236244
// `Arc<dyn WorkerResolver>` implements `WorkerResolver` via deref,
237245
// so the forwarding wrapper is not needed.
246+
// DistributedPhysicalOptimizerRule is added LAST so it sees the
247+
// fully source-optimized plan.
238248
builder = builder
239249
.with_distributed_worker_resolver(ArcWorkerResolver(Arc::clone(resolver)))
240250
.with_distributed_task_estimator(QuickwitTaskEstimator)
241251
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule));
242252
}
243253

244-
// Accumulate contributions from all sources and apply them at once.
245-
let mut combined = crate::data_source::DataSourceContributions::default();
246-
for source in &self.sources {
247-
combined.merge(source.contributions());
248-
}
249-
builder = combined.apply_to_builder(builder);
250-
251254
let mut state = builder.build();
252255

253256
for source in &self.sources {

quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -98,20 +98,9 @@ impl MetricsSplitProvider for MetastoreSplitProvider {
9898

9999
/// Convert a DataFusion `MetricsSplitQuery` to a metastore `ListMetricsSplitsQuery`.
100100
///
101-
/// Note: The OSS parquet column names are bare (service, env, etc.) but the
102-
/// metastore `ListMetricsSplitsQuery` still uses the `tag_service`, `tag_env`
103-
/// field names — this is just the metastore's internal naming convention.
104-
///
105-
/// # Tag field pushdown limitation
106-
///
107-
/// `ListMetricsSplitsQuery` accepts at most one value per tag field
108-
/// (`Option<String>`). When a DataFusion `IN (...)` predicate produces
109-
/// multiple candidate values for a tag column, the metastore cannot express
110-
/// the full filter, so **no metastore-level pruning is applied for that
111-
/// dimension** — the value is left as `None`. The parquet-level filter
112-
/// (applied after the split is opened) will still enforce the predicate
113-
/// correctly. Only single-value equalities (`WHERE service = 'web'`) or
114-
/// single-element IN lists are pushed down to the metastore.
101+
/// Only metric name and time range are forwarded — the only dimensions the
102+
/// metastore reliably populates today. Tag-based pruning will be wired once
103+
/// the zonemap/bloom-filter mechanism is in place.
115104
fn to_metastore_query(index_uid: &IndexUid, query: &MetricsSplitQuery) -> ListMetricsSplitsQuery {
116105
let mut metastore_query = ListMetricsSplitsQuery::for_index(index_uid.clone());
117106

@@ -127,27 +116,8 @@ fn to_metastore_query(index_uid: &IndexUid, query: &MetricsSplitQuery) -> ListMe
127116
metastore_query.time_range_end = Some(end as i64);
128117
}
129118

130-
// Push down a tag filter to the metastore only when there is exactly one
131-
// candidate value. Multi-value IN lists cannot be expressed as a single
132-
// `Option<String>` on `ListMetricsSplitsQuery`; passing only the first
133-
// value would silently skip splits that match the other values, producing
134-
// incorrect (incomplete) results. For multi-value lists we pass `None`
135-
// (no metastore pruning) and rely on the parquet-level filter instead.
136-
metastore_query.tag_service = single_value(query.tag_service.as_deref());
137-
metastore_query.tag_env = single_value(query.tag_env.as_deref());
138-
metastore_query.tag_datacenter = single_value(query.tag_datacenter.as_deref());
139-
metastore_query.tag_region = single_value(query.tag_region.as_deref());
140-
metastore_query.tag_host = single_value(query.tag_host.as_deref());
141119

142120
metastore_query
143121
}
144122

145-
/// Returns the single element of `values` as `Some(value)`, or `None` if
146-
/// `values` is absent, empty, or contains more than one element.
147-
fn single_value(values: Option<&[String]>) -> Option<String> {
148-
match values {
149-
Some([single]) => Some(single.clone()),
150-
_ => None,
151-
}
152-
}
153123

0 commit comments

Comments
 (0)