Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 10 additions & 52 deletions crates/core/common/src/context/query.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use std::{
collections::BTreeMap,
pin::Pin,
sync::{Arc, LazyLock},
task::{Context, Poll},
};

use arrow::{array::ArrayRef, compute::concat_batches, datatypes::SchemaRef};
use arrow::{array::ArrayRef, compute::concat_batches};
use datafusion::{
self,
arrow::array::RecordBatch,
catalog::MemorySchemaProvider,
error::DataFusionError,
execution::{
RecordBatchStream, SendableRecordBatchStream, SessionStateBuilder, config::SessionConfig,
context::SessionContext, memory_pool::human_readable_size, runtime_env::RuntimeEnv,
SendableRecordBatchStream, SessionStateBuilder, config::SessionConfig,
context::SessionContext, runtime_env::RuntimeEnv,
},
logical_expr::LogicalPlan,
physical_optimizer::PhysicalOptimizerRule,
Expand All @@ -25,7 +23,7 @@ use datafusion_tracing::{
InstrumentationOptions, instrument_with_info_spans, pretty_format_compact_batch,
};
use datasets_common::network_id::NetworkId;
use futures::{Stream, TryStreamExt, stream};
use futures::{TryStreamExt, stream};
use regex::Regex;
use tracing::field;

Expand Down Expand Up @@ -82,6 +80,11 @@ impl QueryContext {
})
}

/// Returns the tiered memory pool for this query context.
pub fn memory_pool(&self) -> &Arc<TieredMemoryPool> {
&self.tiered_memory_pool
}

/// Returns the catalog snapshot backing this query context.
pub fn catalog(&self) -> &CatalogSnapshot {
&self.catalog
Expand Down Expand Up @@ -120,10 +123,7 @@ impl QueryContext {
.await
.map_err(ExecutePlanError::Execute)?;

Ok(PeakMemoryStream::wrap(
result,
self.tiered_memory_pool.clone(),
))
Ok(result)
}

/// This will load the result set entirely in memory, so it should be used with caution.
Expand Down Expand Up @@ -535,48 +535,6 @@ fn print_physical_plan(plan: &dyn ExecutionPlan) -> String {
sanitize_parquet_paths(&plan_str)
}

/// A stream wrapper that logs peak memory usage when dropped.
///
/// Because `execute_plan` returns a lazy `SendableRecordBatchStream`, memory is only
/// allocated when the stream is consumed. This wrapper defers the peak memory log to
/// when the stream is dropped (i.e., after consumption or cancellation).
struct PeakMemoryStream {
inner: SendableRecordBatchStream,
pool: Arc<TieredMemoryPool>,
}

impl PeakMemoryStream {
fn wrap(
inner: SendableRecordBatchStream,
pool: Arc<TieredMemoryPool>,
) -> SendableRecordBatchStream {
Box::pin(Self { inner, pool })
}
}

impl Drop for PeakMemoryStream {
fn drop(&mut self) {
tracing::debug!(
peak_memory_mb = human_readable_size(self.pool.peak_reserved()),
"Query memory usage"
);
}
}

impl Stream for PeakMemoryStream {
type Item = Result<RecordBatch, DataFusionError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}

impl RecordBatchStream for PeakMemoryStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}

/// Creates an instrumentation rule that captures metrics and provides previews of data during execution.
pub fn create_instrumentation_rule() -> Arc<dyn PhysicalOptimizerRule + Send + Sync> {
let options_builder = InstrumentationOptions::builder()
Expand Down
16 changes: 12 additions & 4 deletions crates/services/server/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use common::{
},
dataset_store::{DatasetStore, GetDatasetError},
detached_logical_plan::{AttachPlanError, DetachedLogicalPlan},
memory_pool::TieredMemoryPool,
query_env::QueryEnv,
sql::{
ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references,
Expand Down Expand Up @@ -213,6 +214,7 @@ impl Service {
);
}

let memory_pool = ctx.memory_pool().clone();
let record_batches = ctx
.execute_plan(plan, true)
.await
Expand All @@ -224,7 +226,12 @@ impl Service {
};

if let Some(metrics) = &self.metrics {
Ok(track_query_metrics(stream, metrics, query_start_time))
Ok(track_query_metrics(
stream,
metrics,
query_start_time,
Some(memory_pool),
))
} else {
Ok(stream)
}
Expand Down Expand Up @@ -278,7 +285,7 @@ impl Service {
};

if let Some(metrics) = &self.metrics {
Ok(track_query_metrics(stream, metrics, query_start_time))
Ok(track_query_metrics(stream, metrics, query_start_time, None))
} else {
Ok(stream)
}
Expand Down Expand Up @@ -617,6 +624,7 @@ fn track_query_metrics(
stream: QueryResultStream,
metrics: &Arc<MetricsRegistry>,
start_time: std::time::Instant,
memory_pool: Option<Arc<TieredMemoryPool>>,
) -> QueryResultStream {
let metrics = metrics.clone();

Expand Down Expand Up @@ -647,7 +655,7 @@ fn track_query_metrics(
let duration = start_time.elapsed().as_millis() as f64;
let err_msg = e.to_string();
metrics.record_query_error(&err_msg);
metrics.record_query_execution(duration, total_rows, total_bytes);
metrics.record_query_execution(duration, total_rows, total_bytes, memory_pool.as_ref());

yield Err(e);
return;
Expand All @@ -657,7 +665,7 @@ fn track_query_metrics(

// Stream completed successfully, record metrics
let duration = start_time.elapsed().as_millis() as f64;
metrics.record_query_execution(duration, total_rows, total_bytes);
metrics.record_query_execution(duration, total_rows, total_bytes, memory_pool.as_ref());
};

QueryResultStream::NonIncremental {
Expand Down
18 changes: 13 additions & 5 deletions crates/services/server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::sync::Arc;

use common::memory_pool::TieredMemoryPool;
use datafusion::execution::memory_pool::human_readable_size;
use monitoring::telemetry;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -133,11 +137,20 @@ impl MetricsRegistry {
duration_millis: f64,
rows_returned: u64,
bytes_egress: u64,
memory_pool: Option<&Arc<TieredMemoryPool>>,
) {
self.query_count.inc();
self.query_duration.record(duration_millis);
self.query_rows_returned.inc_by(rows_returned);
self.query_bytes_egress.inc_by(bytes_egress);
if let Some(pool) = memory_pool {
let peak = pool.peak_reserved() as u64;
self.query_memory_peak_bytes.record(peak);
tracing::debug!(
peak_memory = human_readable_size(peak as usize),
"Query memory usage"
);
}
}

/// Record query error
Expand Down Expand Up @@ -166,9 +179,4 @@ impl MetricsRegistry {
pub fn record_streaming_lifetime(&self, duration_millis: f64) {
self.streaming_query_lifetime.record(duration_millis);
}

/// Record query memory usage
pub fn record_query_memory(&self, peak_bytes: u64) {
self.query_memory_peak_bytes.record(peak_bytes);
}
}