Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 84 additions & 25 deletions crates/services/server/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
//! binary query execution. It includes the Service struct that handles query execution
//! and the FlightService trait implementation for the Arrow Flight protocol.

use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
pin::Pin,
sync::Arc,
};

use amp_data_store::DataStore;
use arrow_flight::{
Expand Down Expand Up @@ -59,6 +63,7 @@ use datafusion::{
common::DFSchema, error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter,
};
use datasets_common::{
hash_reference::HashReference,
network_id::NetworkId,
partial_reference::{PartialReference, PartialReferenceError},
};
Expand All @@ -68,7 +73,7 @@ use futures::{
};
use js_runtime::isolate_pool::IsolatePool;
use metadata_db::{MetadataDb, NotificationMultiplexerHandle, notification_multiplexer};
use monitoring::telemetry::metrics::Meter;
use monitoring::telemetry::metrics::{KeyValue, Meter};
use prost::Message as _;
use serde_json::json;
use thiserror::Error;
Expand Down Expand Up @@ -151,6 +156,7 @@ impl Service {

let is_streaming =
is_streaming.unwrap_or_else(|| common::stream_helpers::is_streaming(&query));
let dataset_labels = catalog_dataset_labels(&catalog);
let result = self
.execute_plan(
catalog,
Expand All @@ -161,7 +167,7 @@ impl Service {
)
.await;

// Record execution error
// Record execution error, once per dataset
if result.is_err()
&& let Some(metrics) = &self.metrics
{
Expand All @@ -170,7 +176,9 @@ impl Service {
.err()
.map(|err| err.error_code())
.unwrap_or("UNKNOWN_ERROR");
metrics.record_query_error(error_code);
for dataset in &dataset_labels {
metrics.record_query_error(error_code, dataset);
}
}

result
Expand All @@ -186,6 +194,7 @@ impl Service {
resume_watermark: Option<ResumeWatermark>,
) -> Result<QueryResultStream, Error> {
let query_start_time = std::time::Instant::now();
let dataset_labels = catalog_dataset_labels(&catalog);
let schema = {
let schema = plan.schema();
let schema: &arrow::datatypes::Schema = schema.as_ref().as_ref();
Expand Down Expand Up @@ -224,7 +233,12 @@ impl Service {
};

if let Some(metrics) = &self.metrics {
Ok(track_query_metrics(stream, metrics, query_start_time))
Ok(track_query_metrics(
stream,
metrics,
query_start_time,
dataset_labels,
))
} else {
Ok(stream)
}
Expand Down Expand Up @@ -278,7 +292,12 @@ impl Service {
};

if let Some(metrics) = &self.metrics {
Ok(track_query_metrics(stream, metrics, query_start_time))
Ok(track_query_metrics(
stream,
metrics,
query_start_time,
dataset_labels,
))
} else {
Ok(stream)
}
Expand Down Expand Up @@ -612,11 +631,23 @@ fn ipc_schema(schema: &DFSchema) -> Bytes {
bytes.into_inner().into()
}

/// Returns a deduplicated sorted list of dataset references for all datasets referenced
/// by the catalog. Used as per-dataset labels on query metrics.
fn catalog_dataset_labels(catalog: &Catalog) -> Vec<HashReference> {
let datasets: BTreeSet<HashReference> = catalog
.tables()
.iter()
.map(|t| t.dataset_reference().clone())
.collect();
datasets.into_iter().collect()
}

/// Wrap a query result stream with metrics tracking
fn track_query_metrics(
stream: QueryResultStream,
metrics: &Arc<MetricsRegistry>,
start_time: std::time::Instant,
dataset_labels: Vec<HashReference>,
) -> QueryResultStream {
let metrics = metrics.clone();

Expand All @@ -643,21 +674,25 @@ fn track_query_metrics(
yield Ok(batch);
}
Err(e) => {
// Record metrics on error
// Record metrics on error, once per dataset
let duration = start_time.elapsed().as_millis() as f64;
let err_msg = e.to_string();
metrics.record_query_error(&err_msg);
metrics.record_query_execution(duration, total_rows, total_bytes);
for dataset in &dataset_labels {
metrics.record_query_error(&err_msg, dataset);
metrics.record_query_execution(duration, total_rows, total_bytes, dataset);
}

yield Err(e);
return;
}
}
}

// Stream completed successfully, record metrics
// Stream completed successfully, record metrics once per dataset
let duration = start_time.elapsed().as_millis() as f64;
metrics.record_query_execution(duration, total_rows, total_bytes);
for dataset in &dataset_labels {
metrics.record_query_execution(duration, total_rows, total_bytes, dataset);
}
};

QueryResultStream::NonIncremental {
Expand All @@ -670,9 +705,18 @@ fn track_query_metrics(
stream: message_stream,
schema,
} => {
// Increment active streaming query counter
metrics.streaming_queries_active.inc();
metrics.streaming_queries_started.inc();
// Increment active streaming query counters, once per dataset
for dataset in &dataset_labels {
let kv = [
KeyValue::new(
"dataset",
format!("{}/{}", dataset.namespace(), dataset.name()),
),
KeyValue::new("dataset_name", dataset.name().as_str().to_string()),
];
metrics.streaming_queries_active.inc_with_kvs(&kv);
metrics.streaming_queries_started.inc_with_kvs(&kv);
}

let wrapped = stream! {
let mut microbatch_start: Option<std::time::Instant> = None;
Expand All @@ -687,13 +731,16 @@ fn track_query_metrics(
QueryMessage::Data(ref batch) => {
let batch_rows = batch.num_rows() as u64;
let batch_bytes = batch.get_array_memory_size() as u64;
// Record incremental throughput per batch (counters track cumulative totals)
metrics.record_streaming_batch(batch_rows, batch_bytes);
for dataset in &dataset_labels {
metrics.record_streaming_batch(batch_rows, batch_bytes, dataset);
}
}
QueryMessage::MicrobatchEnd(_) => {
if let Some(start) = microbatch_start.take() {
let duration = start.elapsed().as_millis() as f64;
metrics.record_streaming_microbatch_duration(duration);
for dataset in &dataset_labels {
metrics.record_streaming_microbatch_duration(duration, dataset);
}
}
}
QueryMessage::BlockComplete(_) => {}
Expand All @@ -702,23 +749,35 @@ fn track_query_metrics(
yield Ok(message);
}
Err(e) => {
// Record metrics on error
// Record metrics on error, once per dataset
let duration = start_time.elapsed().as_millis() as f64;
metrics.streaming_queries_completed.inc();
metrics.streaming_queries_active.dec();
metrics.record_streaming_lifetime(duration);
for dataset in &dataset_labels {
metrics.record_streaming_lifetime(duration, dataset);
let kv = [
KeyValue::new("dataset", format!("{}/{}", dataset.namespace(), dataset.name())),
KeyValue::new("dataset_name", dataset.name().as_str().to_string()),
];
metrics.streaming_queries_completed.inc_with_kvs(&kv);
metrics.streaming_queries_active.dec_with_kvs(&kv);
}

yield Err(e);
return;
}
}
}

// Record metrics on success
// Record metrics on success, once per dataset
let duration = start_time.elapsed().as_millis() as f64;
metrics.streaming_queries_completed.inc();
metrics.streaming_queries_active.dec();
metrics.record_streaming_lifetime(duration);
for dataset in &dataset_labels {
metrics.record_streaming_lifetime(duration, dataset);
let kv = [
KeyValue::new("dataset", format!("{}/{}", dataset.namespace(), dataset.name())),
KeyValue::new("dataset_name", dataset.name().as_str().to_string()),
];
metrics.streaming_queries_completed.inc_with_kvs(&kv);
metrics.streaming_queries_active.dec_with_kvs(&kv);
}
}
.boxed();

Expand Down
75 changes: 55 additions & 20 deletions crates/services/server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use datasets_common::hash_reference::HashReference;
use monitoring::telemetry;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -133,42 +134,76 @@ impl MetricsRegistry {
duration_millis: f64,
rows_returned: u64,
bytes_egress: u64,
dataset: &HashReference,
) {
self.query_count.inc();
self.query_duration.record(duration_millis);
self.query_rows_returned.inc_by(rows_returned);
self.query_bytes_egress.inc_by(bytes_egress);
let labels = dataset_kvs(dataset);
self.query_count.inc_with_kvs(&labels);
self.query_duration
.record_with_kvs(duration_millis, &labels);
self.query_rows_returned
.inc_by_with_kvs(rows_returned, &labels);
self.query_bytes_egress
.inc_by_with_kvs(bytes_egress, &labels);
}

/// Record query error
pub fn record_query_error(&self, error_code: &str) {
let labels = [telemetry::metrics::KeyValue::new(
pub fn record_query_error(&self, error_code: &str, dataset: &HashReference) {
let mut labels = dataset_kvs(dataset).to_vec();
labels.push(telemetry::metrics::KeyValue::new(
"error_code",
error_code.to_string(),
)];

));
self.query_errors.inc_with_kvs(&labels);
}

/// Record query memory usage
pub fn record_query_memory(&self, peak_bytes: u64) {
self.query_memory_peak_bytes.record(peak_bytes);
}

/// Record streaming microbatch size and throughput
pub fn record_streaming_batch(&self, batch_rows: u64, batch_bytes: u64) {
self.streaming_microbatch_rows.record(batch_rows);
self.streaming_rows_sent.inc_by(batch_rows);
self.streaming_bytes_sent.inc_by(batch_bytes);
pub fn record_streaming_batch(
&self,
batch_rows: u64,
batch_bytes: u64,
dataset: &HashReference,
) {
let labels = dataset_kvs(dataset);
self.streaming_microbatch_rows
.record_with_kvs(batch_rows, &labels);
self.streaming_rows_sent
.inc_by_with_kvs(batch_rows, &labels);
self.streaming_bytes_sent
.inc_by_with_kvs(batch_bytes, &labels);
}

/// Record streaming microbatch duration
pub fn record_streaming_microbatch_duration(&self, duration_millis: f64) {
self.streaming_microbatch_duration.record(duration_millis);
pub fn record_streaming_microbatch_duration(
&self,
duration_millis: f64,
dataset: &HashReference,
) {
let labels = dataset_kvs(dataset);
self.streaming_microbatch_duration
.record_with_kvs(duration_millis, &labels);
}

/// Record streaming query lifetime
pub fn record_streaming_lifetime(&self, duration_millis: f64) {
self.streaming_query_lifetime.record(duration_millis);
pub fn record_streaming_lifetime(&self, duration_millis: f64, dataset: &HashReference) {
let labels = dataset_kvs(dataset);
self.streaming_query_lifetime
.record_with_kvs(duration_millis, &labels);
}
}

/// Record query memory usage
pub fn record_query_memory(&self, peak_bytes: u64) {
self.query_memory_peak_bytes.record(peak_bytes);
}
/// Build the two standard dataset labels from a `HashReference`:
/// `dataset` = `namespace/name`, `dataset_name` = just the name.
fn dataset_kvs(dataset: &HashReference) -> [telemetry::metrics::KeyValue; 2] {
[
telemetry::metrics::KeyValue::new(
"dataset",
format!("{}/{}", dataset.namespace(), dataset.name()),
),
telemetry::metrics::KeyValue::new("dataset_name", dataset.name().as_str().to_string()),
]
}