diff --git a/src/axum.rs b/src/axum.rs index 38ba1b5..0d55e4c 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -70,7 +70,10 @@ where HandlerCtx::new( None, self.task_set.clone(), - TracingInfo::new_with_context(self.router.service_name(), parent_context), + Arc::new(TracingInfo::new_with_context( + self.router.service_name(), + parent_context, + )), ) } } diff --git a/src/lib.rs b/src/lib.rs index 5497472..3781fd0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,7 +90,7 @@ //! //! Routers can also be served over axum websockets. When both `axum` and //! `pubsub` features are enabled, the `pubsub` module provides -//! [`pubsub::AxumWsCfg`] and the [`pubsub::ajj_websocket`] axum handler. This +//! `pubsub::AxumWsCfg` and the `pubsub::ajj_websocket` axum handler. This //! handler will serve the router over websockets at a specific route. The //! router is a property of the `AxumWsCfg` object, and is passed to the //! handler via axum's `State` extractor. diff --git a/src/macros.rs b/src/macros.rs index 4cfbc37..54b3181 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -61,6 +61,7 @@ macro_rules! unwrap_infallible { /// should rely on the histogram metric for size visibility. /// /// See +#[cfg(any(feature = "axum", feature = "pubsub"))] macro_rules! message_event { ($type:literal, service: $service:expr, counter: $counter:expr, bytes: $bytes:expr,) => {{ let bytes = $bytes; diff --git a/src/metrics.rs b/src/metrics.rs index 5027d57..9058a9a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,4 +1,6 @@ -use metrics::{counter, gauge, histogram, Counter, Gauge, Histogram}; +use metrics::{counter, gauge, Counter, Gauge}; +#[cfg(any(feature = "axum", feature = "pubsub"))] +use metrics::{histogram, Histogram}; use std::sync::LazyLock; /// Metric name for counting router calls. @@ -256,6 +258,7 @@ fn record_completed_call(service_name: &'static str, method: &str) { } /// Get or register a histogram for message sizes on a specific service and direction. +#[cfg(any(feature = "axum", feature = "pubsub"))] fn message_size(service_name: &'static str, direction: &'static str) -> Histogram { let _ = &DESCRIBE; histogram!( @@ -266,6 +269,7 @@ fn message_size(service_name: &'static str, direction: &'static str) -> Histogra } /// Record the uncompressed size in bytes of a message sent or received on a service. +#[cfg(any(feature = "axum", feature = "pubsub"))] pub(crate) fn record_message_size( service_name: &'static str, direction: &'static str, diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 54f64a2..fb5a368 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -114,7 +114,6 @@ pub mod ipc { } mod shared; -pub(crate) use shared::WriteItem; pub use shared::{ConnectionId, DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT}; mod shutdown; diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 3797e0a..4675ee7 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -1,10 +1,10 @@ use crate::{ pubsub::{In, JsonSink, Listener, Out}, + routes::WriteItem, types::InboundData, HandlerCtx, TaskSet, TracingInfo, }; use core::fmt; -use serde_json::value::RawValue; use std::sync::{ atomic::{AtomicU32, AtomicU64, Ordering}, Arc, @@ -148,7 +148,6 @@ impl ConnectionManager { items: rx, connection, tx_msg_id: self.tx_msg_id.clone(), - service_name: self.router.service_name(), }; (rt, wt) @@ -259,7 +258,7 @@ where // possible, and then given to the Handler ctx. It // will be populated with request-specific details // (e.g. method) during ctx instantiation. - let tracing = TracingInfo::new(router.service_name()); + let tracing = Arc::new(TracingInfo::new(router.service_name())); let ctx = HandlerCtx::new( @@ -269,11 +268,11 @@ where ); ctx.init_request_span(&router, None); - let span = ctx.span().clone(); - span.in_scope(|| { + let tracing_for_reply = Arc::clone(&ctx.tracing); + ctx.span().in_scope(|| { message_event!( @received, - service: router.service_name(), + service: tracing_for_reply.service, counter: &rx_msg_id, bytes: item_bytes, ); @@ -289,7 +288,7 @@ where // as the task is done regardless. if let Some(json) = fut.await { let _ = permit.send( - WriteItem { span, json } + WriteItem { tracing: tracing_for_reply, json } ); } } @@ -310,13 +309,6 @@ where } } -/// An item to be written to an outbound JSON pubsub stream. -#[derive(Debug, Clone)] -pub(crate) struct WriteItem { - pub(crate) span: tracing::Span, - pub(crate) json: Box, -} - /// The Write Task is responsible for writing JSON to the outbound connection. struct WriteTask { /// Task set @@ -336,9 +328,6 @@ struct WriteTask { /// Counter for OTEL messages sent. pub(crate) tx_msg_id: Arc, - - /// Service name, used to tag the `ajj.router.message_size_bytes` histogram. - pub(crate) service_name: &'static str, } impl WriteTask { @@ -356,7 +345,6 @@ impl WriteTask { mut items, mut connection, tx_msg_id, - service_name, .. } = self; @@ -369,14 +357,15 @@ impl WriteTask { break; } item = items.recv() => { - let Some(WriteItem { span, json }) = item else { - tracing::error!("Json stream has closed"); + let Some(WriteItem { tracing, json }) = item else { + ::tracing::error!("Json stream has closed"); break; }; + let span = tracing.request_span().clone(); span.in_scope(|| { message_event!( @sent, - service: service_name, + service: tracing.service, counter: &tx_msg_id, bytes: json.get().len(), ); diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 39e5b50..1c88587 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -1,17 +1,30 @@ -use crate::{pubsub::WriteItem, types::Request, Router, RpcSend, TaskSet}; +use crate::{types::Request, Router, RpcSend, TaskSet}; use ::tracing::info_span; use opentelemetry::trace::TraceContextExt; use serde_json::value::RawValue; -use std::{future::Future, sync::OnceLock}; +use std::{ + future::Future, + sync::{Arc, OnceLock}, +}; use tokio::{ sync::mpsc::{self, error::SendError}, task::JoinHandle, }; +#[cfg(feature = "pubsub")] use tokio_stream::StreamExt; use tokio_util::sync::WaitForCancellationFutureOwned; use tracing::{enabled, Level}; use tracing_opentelemetry::OpenTelemetrySpanExt; +/// An item to be written to an outbound JSON pubsub stream. Produced by +/// [`HandlerCtx`] notification senders and consumed by the pubsub write task. +#[derive(Debug, Clone)] +#[cfg_attr(not(feature = "pubsub"), allow(dead_code))] +pub(crate) struct WriteItem { + pub(crate) tracing: Arc, + pub(crate) json: Box, +} + /// Errors that can occur when sending notifications. #[derive(thiserror::Error, Debug)] pub enum NotifyError { @@ -39,7 +52,7 @@ impl From> for NotifyError { #[derive(Debug)] pub struct NotifyPermit<'a> { permit: mpsc::Permit<'a, WriteItem>, - span: tracing::Span, + tracing: Arc, } impl<'a> NotifyPermit<'a> { @@ -50,7 +63,7 @@ impl<'a> NotifyPermit<'a> { pub fn send(self, t: T) -> Result<(), serde_json::Error> { let json = t.into_raw_value()?; self.permit.send(WriteItem { - span: self.span, + tracing: self.tracing, json, }); Ok(()) @@ -69,7 +82,7 @@ impl<'a> NotifyPermit<'a> { #[derive(Debug)] pub struct OwnedNotifyPermit { permit: mpsc::OwnedPermit, - span: tracing::Span, + tracing: Arc, } impl OwnedNotifyPermit { @@ -80,7 +93,7 @@ impl OwnedNotifyPermit { pub fn send(self, t: T) -> Result<(), serde_json::Error> { let json = t.into_raw_value()?; self.permit.send(WriteItem { - span: self.span, + tracing: self.tracing, json, }); Ok(()) @@ -201,7 +214,7 @@ impl TracingInfo { /// Panics if the span has not been initialized via /// [`Self::init_request_span`]. #[track_caller] - fn request_span(&self) -> &tracing::Span { + pub(crate) fn request_span(&self) -> &tracing::Span { self.span.get().expect("span not initialized") } @@ -232,16 +245,18 @@ pub struct HandlerCtx { /// A task set on which to spawn tasks. This is used to coordinate pub(crate) tasks: TaskSet, - /// Tracing information for OpenTelemetry. - pub(crate) tracing: TracingInfo, + /// Tracing information for OpenTelemetry, shared with any `WriteItem`s + /// produced by this context. + pub(crate) tracing: Arc, } impl HandlerCtx { /// Create a new handler context. + #[cfg_attr(not(any(feature = "axum", feature = "pubsub")), allow(dead_code))] pub(crate) const fn new( notifications: Option>, tasks: TaskSet, - tracing: TracingInfo, + tracing: Arc, ) -> Self { Self { notifications, @@ -256,7 +271,7 @@ impl HandlerCtx { Self { notifications: None, tasks: TaskSet::default(), - tracing: TracingInfo::mock(), + tracing: Arc::new(TracingInfo::mock()), } } @@ -272,19 +287,20 @@ impl HandlerCtx { Self { notifications: self.notifications.clone(), tasks: self.tasks.clone(), - tracing: self - .tracing - .child(router, self.notifications_enabled(), parent), + tracing: Arc::new( + self.tracing + .child(router, self.notifications_enabled(), parent), + ), } } /// Get a reference to the tracing information for this handler context. - pub const fn tracing_info(&self) -> &TracingInfo { + pub fn tracing_info(&self) -> &TracingInfo { &self.tracing } /// Get the OpenTelemetry service name for this handler context. - pub const fn service_name(&self) -> &'static str { + pub fn service_name(&self) -> &'static str { self.tracing.service } @@ -296,7 +312,7 @@ impl HandlerCtx { /// Set the tracing information for this handler context. pub fn set_tracing_info(&mut self, tracing: TracingInfo) { - self.tracing = tracing; + self.tracing = Arc::new(tracing); } /// Check if notifications can be sent to the client. This will be false @@ -343,7 +359,7 @@ impl HandlerCtx { let rv = t.into_raw_value()?; notifications .send(WriteItem { - span: self.span().clone(), + tracing: Arc::clone(&self.tracing), json: rv, }) .await?; @@ -363,6 +379,7 @@ impl HandlerCtx { /// is not consumed and this returns `Ok(())` immediately. /// /// [`Stream`]: tokio_stream::Stream + #[cfg(feature = "pubsub")] pub async fn notify_stream(&self, stream: S) -> Result<(), NotifyError> where S: tokio_stream::Stream + Send, @@ -386,7 +403,7 @@ impl HandlerCtx { let permit = self.notifications.as_ref()?.reserve().await.ok()?; Some(NotifyPermit { permit, - span: self.span().clone(), + tracing: Arc::clone(&self.tracing), }) } @@ -404,7 +421,7 @@ impl HandlerCtx { .ok()?; Some(OwnedNotifyPermit { permit, - span: self.span().clone(), + tracing: Arc::clone(&self.tracing), }) } @@ -416,7 +433,7 @@ impl HandlerCtx { let permit = self.notifications.as_ref()?.try_reserve().ok()?; Some(NotifyPermit { permit, - span: self.span().clone(), + tracing: Arc::clone(&self.tracing), }) } @@ -433,7 +450,7 @@ impl HandlerCtx { .ok()?; Some(OwnedNotifyPermit { permit, - span: self.span().clone(), + tracing: Arc::clone(&self.tracing), }) } @@ -444,13 +461,13 @@ impl HandlerCtx { /// Returns `None` if notifications are not enabled. pub async fn permit_many(&self, n: usize) -> Option> { let sender = self.notifications.as_ref()?; - let span = self.span().clone(); + let tracing = Arc::clone(&self.tracing); let mut permits = Vec::with_capacity(n); for _ in 0..n { let permit = sender.clone().reserve_owned().await.ok()?; permits.push(OwnedNotifyPermit { permit, - span: span.clone(), + tracing: Arc::clone(&tracing), }); } Some(permits.into_iter()) @@ -464,13 +481,13 @@ impl HandlerCtx { /// available slots. pub fn try_permit_many(&self, n: usize) -> Option> { let sender = self.notifications.as_ref()?; - let span = self.span().clone(); + let tracing = Arc::clone(&self.tracing); let mut permits = Vec::with_capacity(n); for _ in 0..n { let permit = sender.clone().try_reserve_owned().ok()?; permits.push(OwnedNotifyPermit { permit, - span: span.clone(), + tracing: Arc::clone(&tracing), }); } Some(permits.into_iter()) @@ -511,6 +528,7 @@ impl HandlerCtx { /// was cancelled, and `Some` otherwise. /// /// [`Stream`]: tokio_stream::Stream + #[cfg(feature = "pubsub")] pub fn spawn_notify_stream( &self, stream: S, @@ -693,7 +711,7 @@ impl HandlerArgs { } /// Get the service name for this handler invocation. - pub const fn service_name(&self) -> &'static str { + pub fn service_name(&self) -> &'static str { self.ctx.service_name() } } diff --git a/src/routes/handler.rs b/src/routes/handler.rs index 244d5bc..0fe47d8 100644 --- a/src/routes/handler.rs +++ b/src/routes/handler.rs @@ -213,7 +213,7 @@ pub struct PhantomParams(PhantomData); /// However this still leaves the problem of "bar". There is no way to express /// "bar" unambiguously by reordering method invocations. In this case, you can /// use the [`Params`] and [`State`] wrapper structs to disambiguate the -/// argument type. This should seem familiar to users of [`axum`]. +/// argument type. This should seem familiar to users of `axum`. /// /// ``` /// # use ajj::{Router, Params, State}; diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 4423a7d..68745f0 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,4 +1,6 @@ mod ctx; +#[cfg(feature = "pubsub")] +pub(crate) use ctx::WriteItem; pub use ctx::{HandlerArgs, HandlerCtx, NotifyError, NotifyPermit, OwnedNotifyPermit, TracingInfo}; mod erased;