Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ where
HandlerCtx::new(
None,
self.task_set.clone(),
TracingInfo::new_with_context(self.router.service_name(), parent_context),
Arc::new(TracingInfo::new_with_context(
self.router.service_name(),
parent_context,
)),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
//!
//! Routers can also be served over axum websockets. When both `axum` and
//! `pubsub` features are enabled, the `pubsub` module provides
//! [`pubsub::AxumWsCfg`] and the [`pubsub::ajj_websocket`] axum handler. This
//! `pubsub::AxumWsCfg` and the `pubsub::ajj_websocket` axum handler. This
//! handler will serve the router over websockets at a specific route. The
//! router is a property of the `AxumWsCfg` object, and is passed to the
//! handler via axum's `State` extractor.
Expand Down
1 change: 1 addition & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ macro_rules! unwrap_infallible {
/// should rely on the histogram metric for size visibility.
///
/// See <https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events>
#[cfg(any(feature = "axum", feature = "pubsub"))]
macro_rules! message_event {
($type:literal, service: $service:expr, counter: $counter:expr, bytes: $bytes:expr,) => {{
let bytes = $bytes;
Expand Down
6 changes: 5 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use metrics::{counter, gauge, histogram, Counter, Gauge, Histogram};
use metrics::{counter, gauge, Counter, Gauge};
#[cfg(any(feature = "axum", feature = "pubsub"))]
use metrics::{histogram, Histogram};
use std::sync::LazyLock;

/// Metric name for counting router calls.
Expand Down Expand Up @@ -256,6 +258,7 @@ fn record_completed_call(service_name: &'static str, method: &str) {
}

/// Get or register a histogram for message sizes on a specific service and direction.
#[cfg(any(feature = "axum", feature = "pubsub"))]
fn message_size(service_name: &'static str, direction: &'static str) -> Histogram {
let _ = &DESCRIBE;
histogram!(
Expand All @@ -266,6 +269,7 @@ fn message_size(service_name: &'static str, direction: &'static str) -> Histogra
}

/// Record the uncompressed size in bytes of a message sent or received on a service.
#[cfg(any(feature = "axum", feature = "pubsub"))]
pub(crate) fn record_message_size(
service_name: &'static str,
direction: &'static str,
Expand Down
1 change: 0 additions & 1 deletion src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub mod ipc {
}

mod shared;
pub(crate) use shared::WriteItem;
pub use shared::{ConnectionId, DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT};

mod shutdown;
Expand Down
31 changes: 10 additions & 21 deletions src/pubsub/shared.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{
pubsub::{In, JsonSink, Listener, Out},
routes::WriteItem,
types::InboundData,
HandlerCtx, TaskSet, TracingInfo,
};
use core::fmt;
use serde_json::value::RawValue;
use std::sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -148,7 +148,6 @@ impl ConnectionManager {
items: rx,
connection,
tx_msg_id: self.tx_msg_id.clone(),
service_name: self.router.service_name(),
};

(rt, wt)
Expand Down Expand Up @@ -259,7 +258,7 @@ where
// possible, and then given to the Handler ctx. It
// will be populated with request-specific details
// (e.g. method) during ctx instantiation.
let tracing = TracingInfo::new(router.service_name());
let tracing = Arc::new(TracingInfo::new(router.service_name()));

let ctx =
HandlerCtx::new(
Expand All @@ -269,11 +268,11 @@ where
);
ctx.init_request_span(&router, None);

let span = ctx.span().clone();
span.in_scope(|| {
let tracing_for_reply = Arc::clone(&ctx.tracing);
ctx.span().in_scope(|| {
message_event!(
@received,
service: router.service_name(),
service: tracing_for_reply.service,
counter: &rx_msg_id,
bytes: item_bytes,
);
Expand All @@ -289,7 +288,7 @@ where
// as the task is done regardless.
if let Some(json) = fut.await {
let _ = permit.send(
WriteItem { span, json }
WriteItem { tracing: tracing_for_reply, json }
);
}
}
Expand All @@ -310,13 +309,6 @@ where
}
}

/// An item to be written to an outbound JSON pubsub stream.
#[derive(Debug, Clone)]
pub(crate) struct WriteItem {
pub(crate) span: tracing::Span,
pub(crate) json: Box<RawValue>,
}

/// The Write Task is responsible for writing JSON to the outbound connection.
struct WriteTask<T: Listener> {
/// Task set
Expand All @@ -336,9 +328,6 @@ struct WriteTask<T: Listener> {

/// Counter for OTEL messages sent.
pub(crate) tx_msg_id: Arc<AtomicU32>,

/// Service name, used to tag the `ajj.router.message_size_bytes` histogram.
pub(crate) service_name: &'static str,
}

impl<T: Listener> WriteTask<T> {
Expand All @@ -356,7 +345,6 @@ impl<T: Listener> WriteTask<T> {
mut items,
mut connection,
tx_msg_id,
service_name,
..
} = self;

Expand All @@ -369,14 +357,15 @@ impl<T: Listener> WriteTask<T> {
break;
}
item = items.recv() => {
let Some(WriteItem { span, json }) = item else {
tracing::error!("Json stream has closed");
let Some(WriteItem { tracing, json }) = item else {
::tracing::error!("Json stream has closed");
break;
};
let span = tracing.request_span().clone();
span.in_scope(|| {
message_event!(
@sent,
service: service_name,
service: tracing.service,
counter: &tx_msg_id,
bytes: json.get().len(),
);
Expand Down
Loading
Loading