Skip to content

Commit 2e271bd

Browse files
authored
Save query metrics to state store (#109)
* Save query metrics to state store * Save query metrics to state store * Save query metrics to state store * Disable default * Fix test * Fix test
1 parent e1cf4fe commit 2e271bd

7 files changed

Lines changed: 189 additions & 17 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/executor/src/models.rs

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use crate::query_types::QueryId;
22
use datafusion::arrow::array::RecordBatch;
33
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
44
use datafusion_common::arrow::datatypes::Schema;
5+
use datafusion_physical_plan::metrics::{Metric, MetricsSet};
56
use functions::to_snowflake_datatype;
67
use serde::{Deserialize, Serialize};
8+
use serde_json::{Map as JsonMap, Value as JsonValue, json};
79
use std::collections::HashMap;
810
use std::sync::Arc;
911
use uuid::Uuid;
@@ -73,20 +75,63 @@ pub struct QueryResult {
7375
/// The schema associated with the result.
7476
/// This is required to construct a valid response even when `records` are empty
7577
pub schema: Arc<ArrowSchema>,
78+
/// Execution plan metrics collected after running the query.
79+
pub metrics: Vec<QueryMetric>,
7680
}
7781

7882
impl QueryResult {
7983
#[must_use]
8084
pub const fn new(records: Vec<RecordBatch>, schema: Arc<ArrowSchema>) -> Self {
81-
Self { records, schema }
85+
Self {
86+
records,
87+
schema,
88+
metrics: Vec::new(),
89+
}
8290
}
8391

92+
#[must_use]
93+
pub const fn new_with_metrics(
94+
records: Vec<RecordBatch>,
95+
schema: Arc<ArrowSchema>,
96+
metrics: Vec<QueryMetric>,
97+
) -> Self {
98+
Self {
99+
records,
100+
schema,
101+
metrics,
102+
}
103+
}
84104
#[must_use]
85105
pub fn column_info(&self) -> Vec<ColumnInfo> {
86106
ColumnInfo::from_schema(&self.schema)
87107
}
88108
}
89109

110+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
111+
pub struct QueryMetric {
112+
pub node_id: usize,
113+
pub parent_node_id: Option<usize>,
114+
pub operator: String,
115+
pub metrics: serde_json::Value, // serialized metrics as JSON object
116+
}
117+
118+
impl QueryMetric {
119+
#[must_use]
120+
pub fn new(
121+
node_id: usize,
122+
parent_node_id: Option<usize>,
123+
operator: &str,
124+
metrics: serde_json::Value,
125+
) -> Self {
126+
Self {
127+
node_id,
128+
parent_node_id,
129+
operator: operator.to_string(),
130+
metrics,
131+
}
132+
}
133+
}
134+
90135
// TODO: We should not have serde dependency here
91136
// Instead it should be in api-snowflake-rest
92137
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -196,6 +241,40 @@ impl ColumnInfo {
196241
}
197242
}
198243

244+
#[must_use]
245+
pub fn metrics_set_to_json(metrics: Option<MetricsSet>) -> JsonValue {
246+
let metrics = metrics
247+
.map(|metrics| {
248+
metrics
249+
.iter()
250+
.map(|metric| metric_to_json(metric.as_ref()))
251+
.collect()
252+
})
253+
.unwrap_or_default();
254+
JsonValue::Array(metrics)
255+
}
256+
257+
fn metric_to_json(metric: &Metric) -> JsonValue {
258+
let labels = metric
259+
.labels()
260+
.iter()
261+
.map(|label| {
262+
(
263+
label.name().to_string(),
264+
JsonValue::String(label.value().to_string()),
265+
)
266+
})
267+
.collect::<JsonMap<String, JsonValue>>();
268+
269+
json!({
270+
"name": metric.value().name(),
271+
"value": metric.value().as_usize(),
272+
"display": metric.value().to_string(),
273+
"partition": metric.partition(),
274+
"labels": labels,
275+
})
276+
}
277+
199278
#[cfg(test)]
200279
mod tests {
201280
use crate::models::ColumnInfo;

crates/executor/src/query.rs

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::datafusion::physical_plan::merge::{
1717
};
1818
use crate::datafusion::rewriters::session_context::SessionContextExprRewriter;
1919
use crate::error::{OperationOn, OperationType};
20-
use crate::models::{QueryContext, QueryResult};
20+
use crate::models::{QueryContext, QueryMetric, QueryResult, metrics_set_to_json};
2121
use catalog::table::{CachingTable, IcebergTableBuilder};
2222
use catalog_metastore::{
2323
AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume,
@@ -68,7 +68,7 @@ use datafusion_expr::{
6868
};
6969
use datafusion_iceberg::DataFusionTable;
7070
use datafusion_iceberg::table::DataFusionTableConfigBuilder;
71-
use datafusion_physical_plan::collect;
71+
use datafusion_physical_plan::{ExecutionPlan, collect};
7272
use functions::semi_structured::variant::visitors::visit_all;
7373
use functions::session_params::SessionProperty;
7474
use functions::visitors::{
@@ -105,6 +105,7 @@ use std::ops::ControlFlow;
105105
use std::result::Result as StdResult;
106106
use std::str::FromStr;
107107
use std::sync::Arc;
108+
use std::sync::atomic::{AtomicUsize, Ordering};
108109
use tracing::Instrument;
109110
use tracing_attributes::instrument;
110111
use url::Url;
@@ -2130,11 +2131,24 @@ impl UserQuery {
21302131
.await
21312132
.context(ex_error::DataFusionSnafu)?;
21322133
let mut schema = df.schema().as_arrow().clone();
2133-
let records = df.collect().await.context(ex_error::DataFusionSnafu)?;
2134+
let physical_plan = df
2135+
.create_physical_plan()
2136+
.await
2137+
.context(ex_error::DataFusionSnafu)?;
2138+
let task_ctx = session.ctx.task_ctx();
2139+
let metrics_plan = Arc::clone(&physical_plan);
2140+
let records = collect(physical_plan, task_ctx)
2141+
.await
2142+
.context(ex_error::DataFusionSnafu)?;
21342143
if !records.is_empty() {
21352144
schema = records[0].schema().as_ref().clone();
21362145
}
2137-
Ok::<QueryResult, Error>(QueryResult::new(records, Arc::new(schema)))
2146+
let metrics = build_plan_metrics(&metrics_plan);
2147+
Ok::<QueryResult, Error>(QueryResult::new_with_metrics(
2148+
records,
2149+
Arc::new(schema),
2150+
metrics,
2151+
))
21382152
}
21392153

21402154
#[instrument(name = "UserQuery::execute_sql", level = "debug", skip(self), err, ret)]
@@ -2164,19 +2178,30 @@ impl UserQuery {
21642178
span: tracing::Span,
21652179
) -> Result<QueryResult> {
21662180
let mut schema = plan.schema().as_arrow().clone();
2167-
let records = session
2181+
let df = session
21682182
.ctx
21692183
.execute_logical_plan(plan)
21702184
.await
2171-
.context(ex_error::DataFusionSnafu)?
2172-
.collect()
2185+
.context(ex_error::DataFusionSnafu)?;
2186+
let physical_plan = df
2187+
.create_physical_plan()
2188+
.await
2189+
.context(ex_error::DataFusionSnafu)?;
2190+
let task_ctx = session.ctx.task_ctx();
2191+
let metrics_plan = Arc::clone(&physical_plan);
2192+
let records = collect(physical_plan, task_ctx)
21732193
.instrument(span)
21742194
.await
21752195
.context(ex_error::DataFusionSnafu)?;
21762196
if !records.is_empty() {
21772197
schema = records[0].schema().as_ref().clone();
21782198
}
2179-
Ok::<QueryResult, Error>(QueryResult::new(records, Arc::new(schema)))
2199+
let metrics = build_plan_metrics(&metrics_plan);
2200+
Ok::<QueryResult, Error>(QueryResult::new_with_metrics(
2201+
records,
2202+
Arc::new(schema),
2203+
metrics,
2204+
))
21802205
}
21812206

21822207
async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<QueryResult> {
@@ -2221,13 +2246,19 @@ impl UserQuery {
22212246
.optimize(physical_plan, &ConfigOptions::new())
22222247
.context(ex_error::DataFusionSnafu)?;
22232248
}
2249+
let metrics_plan = Arc::clone(&physical_plan);
22242250
let records = collect(physical_plan, Arc::new(task_ctx))
22252251
.await
22262252
.context(ex_error::DataFusionSnafu)?;
22272253
if !records.is_empty() {
22282254
schema = records[0].schema().as_ref().clone();
22292255
}
2230-
Ok::<QueryResult, Error>(QueryResult::new(records, Arc::new(schema)))
2256+
let metrics = build_plan_metrics(&metrics_plan);
2257+
Ok::<QueryResult, Error>(QueryResult::new_with_metrics(
2258+
records,
2259+
Arc::new(schema),
2260+
metrics,
2261+
))
22312262
}
22322263

22332264
#[instrument(
@@ -3549,3 +3580,26 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe
35493580
table: Arc::from(table_ref.table.to_ascii_lowercase()),
35503581
}
35513582
}
3583+
3584+
fn build_plan_metrics(plan: &Arc<dyn ExecutionPlan>) -> Vec<QueryMetric> {
3585+
let counter = AtomicUsize::new(0);
3586+
let mut metrics = Vec::new();
3587+
collect_plan_metrics(plan, None, &counter, &mut metrics);
3588+
metrics
3589+
}
3590+
3591+
fn collect_plan_metrics(
3592+
plan: &Arc<dyn ExecutionPlan>,
3593+
parent: Option<usize>,
3594+
counter: &AtomicUsize,
3595+
out: &mut Vec<QueryMetric>,
3596+
) {
3597+
let node_id = counter.fetch_add(1, Ordering::SeqCst);
3598+
let metrics_json = metrics_set_to_json(plan.metrics());
3599+
3600+
out.push(QueryMetric::new(node_id, parent, plan.name(), metrics_json));
3601+
3602+
for child in plan.children() {
3603+
collect_plan_metrics(child, Some(node_id), counter, out);
3604+
}
3605+
}

crates/executor/src/query_task_result.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use super::models::QueryResult;
55
use super::query_types::ExecutionStatus;
66
use super::snowflake_error::SnowflakeError;
77
use snafu::ResultExt;
8+
use state_store::QueryMetric;
89
use tokio::task::JoinError;
910
use uuid::Uuid;
1011

@@ -17,6 +18,7 @@ pub struct ExecutionTaskResult {
1718
}
1819

1920
impl ExecutionTaskResult {
21+
#[must_use]
2022
pub fn from_query_result(query_id: Uuid, result: Result<QueryResult>) -> Self {
2123
let execution_status = result
2224
.as_ref()
@@ -86,6 +88,21 @@ impl ExecutionTaskResult {
8688
if let Err(err) = &self.result {
8789
query.set_error_message(err.to_string());
8890
}
91+
92+
// Collect result metrics
93+
if let Ok(res) = &self.result {
94+
query.set_query_metrics(
95+
res.metrics
96+
.iter()
97+
.map(|metric| QueryMetric {
98+
node_id: metric.node_id,
99+
parent_node_id: metric.parent_node_id,
100+
operator: metric.operator.clone(),
101+
metrics: metric.metrics.clone(),
102+
})
103+
.collect(),
104+
);
105+
}
89106
query.set_end_time();
90107
}
91108
}

crates/executor/src/tests/statestore_queries_unittest.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,15 @@ async fn test_query_lifecycle_ok_query() {
9999
"end_time": "2026-01-01T01:01:01.000000001Z",
100100
"execution_time": "1",
101101
"query_hash": "12320374230549905548",
102-
"query_hash_version": 1
102+
"query_hash_version": 1,
103+
"query_metrics": [
104+
{
105+
"node_id": 0,
106+
"parent_node_id": null,
107+
"operator": "EmptyExec",
108+
"metrics": []
109+
}
110+
]
103111
}
104112
"#);
105113
});

crates/state-store/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub mod state_store_dynamo;
55

66
pub use config::DynamoDbConfig;
77
pub use error::{Error, Result};
8-
pub use models::{ExecutionStatus, Query, SessionRecord, Variable, ViewRecord};
8+
pub use models::{ExecutionStatus, Query, QueryMetric, SessionRecord, Variable, ViewRecord};
99
pub use state_store_dynamo::DynamoDbStateStore;
1010

1111
#[mockall::automock]

crates/state-store/src/models.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ pub struct Query {
279279
#[serde(default, skip_serializing_if = "Option::is_none")]
280280
pub bind_values: Option<Value>,
281281
#[serde(default, skip_serializing_if = "Option::is_none")]
282+
pub query_metrics: Option<Vec<QueryMetric>>,
283+
#[serde(default, skip_serializing_if = "Option::is_none")]
282284
pub query_history_time: Option<u64>,
283285
#[serde(default, skip_serializing_if = "Option::is_none")]
284286
pub query_result_time: Option<u64>,
@@ -331,6 +333,10 @@ impl Query {
331333
self.error_message = Some(error_message);
332334
}
333335

336+
pub fn set_query_metrics(&mut self, metrics: Vec<QueryMetric>) {
337+
self.query_metrics = Some(metrics);
338+
}
339+
334340
pub fn set_warehouse_type(&mut self, warehouse_type: String) {
335341
self.warehouse_type = Some(warehouse_type);
336342
}
@@ -342,3 +348,11 @@ impl Query {
342348
self.execution_time = Some((end_time - self.start_time).num_milliseconds() as u64);
343349
}
344350
}
351+
352+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
353+
pub struct QueryMetric {
354+
pub node_id: usize,
355+
pub parent_node_id: Option<usize>,
356+
pub operator: String,
357+
pub metrics: Value, // serialized metrics as JSON object
358+
}

0 commit comments

Comments
 (0)