From b772ef0d33eb1902b802fd12c59534451239f1ca Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 18 Feb 2026 15:41:59 +0000 Subject: [PATCH 1/2] feat(server): label query metrics with dataset Add per-dataset dimension to all query metrics so operators can filter throughput, errors, and latency by dataset in observability dashboards. - Add `catalog_dataset_labels()` to extract unique `namespace/name` labels from the catalog - Add `dataset` parameter to all `MetricsRegistry` recording methods - Emit metrics once per referenced dataset for cross-dataset queries - Label streaming counters (`active`, `started`, `completed`) with dataset key-values Signed-off-by: Leonardo Yvens --- crates/services/server/src/flight.rs | 99 ++++++++++++++++++++------- crates/services/server/src/metrics.rs | 70 ++++++++++++------- 2 files changed, 121 insertions(+), 48 deletions(-) diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 0705d7f80..165e44bf7 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -4,7 +4,11 @@ //! binary query execution. It includes the Service struct that handles query execution //! and the FlightService trait implementation for the Arrow Flight protocol. -use std::{collections::BTreeMap, pin::Pin, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + pin::Pin, + sync::Arc, +}; use amp_data_store::DataStore; use arrow_flight::{ @@ -68,7 +72,7 @@ use futures::{ }; use js_runtime::isolate_pool::IsolatePool; use metadata_db::{MetadataDb, NotificationMultiplexerHandle, notification_multiplexer}; -use monitoring::telemetry::metrics::Meter; +use monitoring::telemetry::metrics::{KeyValue, Meter}; use prost::Message as _; use serde_json::json; use thiserror::Error; @@ -151,6 +155,7 @@ impl Service { let is_streaming = is_streaming.unwrap_or_else(|| common::stream_helpers::is_streaming(&query)); + let dataset_labels = catalog_dataset_labels(&catalog); let result = self .execute_plan( catalog, @@ -161,7 +166,7 @@ impl Service { ) .await; - // Record execution error + // Record execution error, once per dataset if result.is_err() && let Some(metrics) = &self.metrics { @@ -170,7 +175,9 @@ impl Service { .err() .map(|err| err.error_code()) .unwrap_or("UNKNOWN_ERROR"); - metrics.record_query_error(error_code); + for dataset in &dataset_labels { + metrics.record_query_error(error_code, dataset); + } } result @@ -186,6 +193,7 @@ impl Service { resume_watermark: Option, ) -> Result { let query_start_time = std::time::Instant::now(); + let dataset_labels = catalog_dataset_labels(&catalog); let schema = { let schema = plan.schema(); let schema: &arrow::datatypes::Schema = schema.as_ref().as_ref(); @@ -224,7 +232,12 @@ impl Service { }; if let Some(metrics) = &self.metrics { - Ok(track_query_metrics(stream, metrics, query_start_time)) + Ok(track_query_metrics( + stream, + metrics, + query_start_time, + dataset_labels, + )) } else { Ok(stream) } @@ -278,7 +291,12 @@ impl Service { }; if let Some(metrics) = &self.metrics { - Ok(track_query_metrics(stream, metrics, query_start_time)) + Ok(track_query_metrics( + stream, + metrics, + query_start_time, + dataset_labels, + )) } else { Ok(stream) } @@ -612,11 +630,26 @@ fn ipc_schema(schema: &DFSchema) -> Bytes { bytes.into_inner().into() } +/// Returns a deduplicated sorted list of unique `namespace/name` values for all +/// datasets referenced by the catalog. Used as per-dataset `dataset` labels on query metrics. +fn catalog_dataset_labels(catalog: &Catalog) -> Vec { + let datasets: BTreeSet = catalog + .tables() + .iter() + .map(|t| { + let r = t.dataset_reference(); + format!("{}/{}", r.namespace(), r.name()) + }) + .collect(); + datasets.into_iter().collect() +} + /// Wrap a query result stream with metrics tracking fn track_query_metrics( stream: QueryResultStream, metrics: &Arc, start_time: std::time::Instant, + dataset_labels: Vec, ) -> QueryResultStream { let metrics = metrics.clone(); @@ -643,11 +676,13 @@ fn track_query_metrics( yield Ok(batch); } Err(e) => { - // Record metrics on error + // Record metrics on error, once per dataset let duration = start_time.elapsed().as_millis() as f64; let err_msg = e.to_string(); - metrics.record_query_error(&err_msg); - metrics.record_query_execution(duration, total_rows, total_bytes); + for dataset in &dataset_labels { + metrics.record_query_error(&err_msg, dataset); + metrics.record_query_execution(duration, total_rows, total_bytes, dataset); + } yield Err(e); return; @@ -655,9 +690,11 @@ fn track_query_metrics( } } - // Stream completed successfully, record metrics + // Stream completed successfully, record metrics once per dataset let duration = start_time.elapsed().as_millis() as f64; - metrics.record_query_execution(duration, total_rows, total_bytes); + for dataset in &dataset_labels { + metrics.record_query_execution(duration, total_rows, total_bytes, dataset); + } }; QueryResultStream::NonIncremental { @@ -670,9 +707,12 @@ fn track_query_metrics( stream: message_stream, schema, } => { - // Increment active streaming query counter - metrics.streaming_queries_active.inc(); - metrics.streaming_queries_started.inc(); + // Increment active streaming query counters, once per dataset + for dataset in &dataset_labels { + let kv = [KeyValue::new("dataset", dataset.clone())]; + metrics.streaming_queries_active.inc_with_kvs(&kv); + metrics.streaming_queries_started.inc_with_kvs(&kv); + } let wrapped = stream! { let mut microbatch_start: Option = None; @@ -687,13 +727,16 @@ fn track_query_metrics( QueryMessage::Data(ref batch) => { let batch_rows = batch.num_rows() as u64; let batch_bytes = batch.get_array_memory_size() as u64; - // Record incremental throughput per batch (counters track cumulative totals) - metrics.record_streaming_batch(batch_rows, batch_bytes); + for dataset in &dataset_labels { + metrics.record_streaming_batch(batch_rows, batch_bytes, dataset); + } } QueryMessage::MicrobatchEnd(_) => { if let Some(start) = microbatch_start.take() { let duration = start.elapsed().as_millis() as f64; - metrics.record_streaming_microbatch_duration(duration); + for dataset in &dataset_labels { + metrics.record_streaming_microbatch_duration(duration, dataset); + } } } QueryMessage::BlockComplete(_) => {} @@ -702,11 +745,14 @@ fn track_query_metrics( yield Ok(message); } Err(e) => { - // Record metrics on error + // Record metrics on error, once per dataset let duration = start_time.elapsed().as_millis() as f64; - metrics.streaming_queries_completed.inc(); - metrics.streaming_queries_active.dec(); - metrics.record_streaming_lifetime(duration); + for dataset in &dataset_labels { + metrics.record_streaming_lifetime(duration, dataset); + let kv = [KeyValue::new("dataset", dataset.clone())]; + metrics.streaming_queries_completed.inc_with_kvs(&kv); + metrics.streaming_queries_active.dec_with_kvs(&kv); + } yield Err(e); return; @@ -714,11 +760,14 @@ fn track_query_metrics( } } - // Record metrics on success + // Record metrics on success, once per dataset let duration = start_time.elapsed().as_millis() as f64; - metrics.streaming_queries_completed.inc(); - metrics.streaming_queries_active.dec(); - metrics.record_streaming_lifetime(duration); + for dataset in &dataset_labels { + metrics.record_streaming_lifetime(duration, dataset); + let kv = [KeyValue::new("dataset", dataset.clone())]; + metrics.streaming_queries_completed.inc_with_kvs(&kv); + metrics.streaming_queries_active.dec_with_kvs(&kv); + } } .boxed(); diff --git a/crates/services/server/src/metrics.rs b/crates/services/server/src/metrics.rs index dd8af1e78..fe374ba95 100644 --- a/crates/services/server/src/metrics.rs +++ b/crates/services/server/src/metrics.rs @@ -133,42 +133,66 @@ impl MetricsRegistry { duration_millis: f64, rows_returned: u64, bytes_egress: u64, + dataset: &str, ) { - self.query_count.inc(); - self.query_duration.record(duration_millis); - self.query_rows_returned.inc_by(rows_returned); - self.query_bytes_egress.inc_by(bytes_egress); - } - - /// Record query error - pub fn record_query_error(&self, error_code: &str) { let labels = [telemetry::metrics::KeyValue::new( - "error_code", - error_code.to_string(), + "dataset", + dataset.to_string(), )]; + self.query_count.inc_with_kvs(&labels); + self.query_duration + .record_with_kvs(duration_millis, &labels); + self.query_rows_returned + .inc_by_with_kvs(rows_returned, &labels); + self.query_bytes_egress + .inc_by_with_kvs(bytes_egress, &labels); + } + /// Record query error + pub fn record_query_error(&self, error_code: &str, dataset: &str) { + let labels = [ + telemetry::metrics::KeyValue::new("error_code", error_code.to_string()), + telemetry::metrics::KeyValue::new("dataset", dataset.to_string()), + ]; self.query_errors.inc_with_kvs(&labels); } + /// Record query memory usage + pub fn record_query_memory(&self, peak_bytes: u64) { + self.query_memory_peak_bytes.record(peak_bytes); + } + /// Record streaming microbatch size and throughput - pub fn record_streaming_batch(&self, batch_rows: u64, batch_bytes: u64) { - self.streaming_microbatch_rows.record(batch_rows); - self.streaming_rows_sent.inc_by(batch_rows); - self.streaming_bytes_sent.inc_by(batch_bytes); + pub fn record_streaming_batch(&self, batch_rows: u64, batch_bytes: u64, dataset: &str) { + let labels = [telemetry::metrics::KeyValue::new( + "dataset", + dataset.to_string(), + )]; + self.streaming_microbatch_rows + .record_with_kvs(batch_rows, &labels); + self.streaming_rows_sent + .inc_by_with_kvs(batch_rows, &labels); + self.streaming_bytes_sent + .inc_by_with_kvs(batch_bytes, &labels); } /// Record streaming microbatch duration - pub fn record_streaming_microbatch_duration(&self, duration_millis: f64) { - self.streaming_microbatch_duration.record(duration_millis); + pub fn record_streaming_microbatch_duration(&self, duration_millis: f64, dataset: &str) { + let labels = [telemetry::metrics::KeyValue::new( + "dataset", + dataset.to_string(), + )]; + self.streaming_microbatch_duration + .record_with_kvs(duration_millis, &labels); } /// Record streaming query lifetime - pub fn record_streaming_lifetime(&self, duration_millis: f64) { - self.streaming_query_lifetime.record(duration_millis); - } - - /// Record query memory usage - pub fn record_query_memory(&self, peak_bytes: u64) { - self.query_memory_peak_bytes.record(peak_bytes); + pub fn record_streaming_lifetime(&self, duration_millis: f64, dataset: &str) { + let labels = [telemetry::metrics::KeyValue::new( + "dataset", + dataset.to_string(), + )]; + self.streaming_query_lifetime + .record_with_kvs(duration_millis, &labels); } } From 358521475cb8424e7858f51bda8b599506077177 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 18 Feb 2026 17:25:16 +0000 Subject: [PATCH 2/2] feat(server): add dataset_name as separate label on query metrics Expose the dataset name without namespace prefix as a dedicated label to simplify metric queries and aggregations that don't need namespace context. - Add `dataset_name` label alongside existing `dataset` (`namespace/name`) on all query metrics - Introduce `dataset_kvs()` helper in `metrics.rs` to centralize both label construction - Refactor metrics methods to accept `&HashReference` instead of `&str` for type safety - Update `catalog_dataset_labels` to return `Vec` instead of `Vec` Signed-off-by: Leonardo Yvens --- crates/services/server/src/flight.rs | 34 +++++++++------ crates/services/server/src/metrics.rs | 61 ++++++++++++++++----------- 2 files changed, 58 insertions(+), 37 deletions(-) diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 165e44bf7..3bdd610a4 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -63,6 +63,7 @@ use datafusion::{ common::DFSchema, error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter, }; use datasets_common::{ + hash_reference::HashReference, network_id::NetworkId, partial_reference::{PartialReference, PartialReferenceError}, }; @@ -630,16 +631,13 @@ fn ipc_schema(schema: &DFSchema) -> Bytes { bytes.into_inner().into() } -/// Returns a deduplicated sorted list of unique `namespace/name` values for all -/// datasets referenced by the catalog. Used as per-dataset `dataset` labels on query metrics. -fn catalog_dataset_labels(catalog: &Catalog) -> Vec { - let datasets: BTreeSet = catalog +/// Returns a deduplicated sorted list of dataset references for all datasets referenced +/// by the catalog. Used as per-dataset labels on query metrics. +fn catalog_dataset_labels(catalog: &Catalog) -> Vec { + let datasets: BTreeSet = catalog .tables() .iter() - .map(|t| { - let r = t.dataset_reference(); - format!("{}/{}", r.namespace(), r.name()) - }) + .map(|t| t.dataset_reference().clone()) .collect(); datasets.into_iter().collect() } @@ -649,7 +647,7 @@ fn track_query_metrics( stream: QueryResultStream, metrics: &Arc, start_time: std::time::Instant, - dataset_labels: Vec, + dataset_labels: Vec, ) -> QueryResultStream { let metrics = metrics.clone(); @@ -709,7 +707,13 @@ fn track_query_metrics( } => { // Increment active streaming query counters, once per dataset for dataset in &dataset_labels { - let kv = [KeyValue::new("dataset", dataset.clone())]; + let kv = [ + KeyValue::new( + "dataset", + format!("{}/{}", dataset.namespace(), dataset.name()), + ), + KeyValue::new("dataset_name", dataset.name().as_str().to_string()), + ]; metrics.streaming_queries_active.inc_with_kvs(&kv); metrics.streaming_queries_started.inc_with_kvs(&kv); } @@ -749,7 +753,10 @@ fn track_query_metrics( let duration = start_time.elapsed().as_millis() as f64; for dataset in &dataset_labels { metrics.record_streaming_lifetime(duration, dataset); - let kv = [KeyValue::new("dataset", dataset.clone())]; + let kv = [ + KeyValue::new("dataset", format!("{}/{}", dataset.namespace(), dataset.name())), + KeyValue::new("dataset_name", dataset.name().as_str().to_string()), + ]; metrics.streaming_queries_completed.inc_with_kvs(&kv); metrics.streaming_queries_active.dec_with_kvs(&kv); } @@ -764,7 +771,10 @@ fn track_query_metrics( let duration = start_time.elapsed().as_millis() as f64; for dataset in &dataset_labels { metrics.record_streaming_lifetime(duration, dataset); - let kv = [KeyValue::new("dataset", dataset.clone())]; + let kv = [ + KeyValue::new("dataset", format!("{}/{}", dataset.namespace(), dataset.name())), + KeyValue::new("dataset_name", dataset.name().as_str().to_string()), + ]; metrics.streaming_queries_completed.inc_with_kvs(&kv); metrics.streaming_queries_active.dec_with_kvs(&kv); } diff --git a/crates/services/server/src/metrics.rs b/crates/services/server/src/metrics.rs index fe374ba95..437cadf8f 100644 --- a/crates/services/server/src/metrics.rs +++ b/crates/services/server/src/metrics.rs @@ -1,3 +1,4 @@ +use datasets_common::hash_reference::HashReference; use monitoring::telemetry; #[derive(Debug, Clone)] @@ -133,12 +134,9 @@ impl MetricsRegistry { duration_millis: f64, rows_returned: u64, bytes_egress: u64, - dataset: &str, + dataset: &HashReference, ) { - let labels = [telemetry::metrics::KeyValue::new( - "dataset", - dataset.to_string(), - )]; + let labels = dataset_kvs(dataset); self.query_count.inc_with_kvs(&labels); self.query_duration .record_with_kvs(duration_millis, &labels); @@ -149,11 +147,12 @@ impl MetricsRegistry { } /// Record query error - pub fn record_query_error(&self, error_code: &str, dataset: &str) { - let labels = [ - telemetry::metrics::KeyValue::new("error_code", error_code.to_string()), - telemetry::metrics::KeyValue::new("dataset", dataset.to_string()), - ]; + pub fn record_query_error(&self, error_code: &str, dataset: &HashReference) { + let mut labels = dataset_kvs(dataset).to_vec(); + labels.push(telemetry::metrics::KeyValue::new( + "error_code", + error_code.to_string(), + )); self.query_errors.inc_with_kvs(&labels); } @@ -163,11 +162,13 @@ impl MetricsRegistry { } /// Record streaming microbatch size and throughput - pub fn record_streaming_batch(&self, batch_rows: u64, batch_bytes: u64, dataset: &str) { - let labels = [telemetry::metrics::KeyValue::new( - "dataset", - dataset.to_string(), - )]; + pub fn record_streaming_batch( + &self, + batch_rows: u64, + batch_bytes: u64, + dataset: &HashReference, + ) { + let labels = dataset_kvs(dataset); self.streaming_microbatch_rows .record_with_kvs(batch_rows, &labels); self.streaming_rows_sent @@ -177,22 +178,32 @@ impl MetricsRegistry { } /// Record streaming microbatch duration - pub fn record_streaming_microbatch_duration(&self, duration_millis: f64, dataset: &str) { - let labels = [telemetry::metrics::KeyValue::new( - "dataset", - dataset.to_string(), - )]; + pub fn record_streaming_microbatch_duration( + &self, + duration_millis: f64, + dataset: &HashReference, + ) { + let labels = dataset_kvs(dataset); self.streaming_microbatch_duration .record_with_kvs(duration_millis, &labels); } /// Record streaming query lifetime - pub fn record_streaming_lifetime(&self, duration_millis: f64, dataset: &str) { - let labels = [telemetry::metrics::KeyValue::new( - "dataset", - dataset.to_string(), - )]; + pub fn record_streaming_lifetime(&self, duration_millis: f64, dataset: &HashReference) { + let labels = dataset_kvs(dataset); self.streaming_query_lifetime .record_with_kvs(duration_millis, &labels); } } + +/// Build the two standard dataset labels from a `HashReference`: +/// `dataset` = `namespace/name`, `dataset_name` = just the name. +fn dataset_kvs(dataset: &HashReference) -> [telemetry::metrics::KeyValue; 2] { + [ + telemetry::metrics::KeyValue::new( + "dataset", + format!("{}/{}", dataset.namespace(), dataset.name()), + ), + telemetry::metrics::KeyValue::new("dataset_name", dataset.name().as_str().to_string()), + ] +}