diff --git a/native/rust/dimos-module/src/module.rs b/native/rust/dimos-module/src/module.rs index 9746aec5a1..6a71008c7f 100644 --- a/native/rust/dimos-module/src/module.rs +++ b/native/rust/dimos-module/src/module.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; use std::fmt::Debug; use std::io; -use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; @@ -9,8 +11,10 @@ use serde::de::DeserializeOwned; use crate::transport::Transport; -const INPUT_CHANNEL_CAPACITY: usize = 16; -const PUBLISH_CHANNEL_CAPACITY: usize = 64; +const INPUT_CHANNEL_CAPACITY: usize = 1024; +const PUBLISH_CHANNEL_CAPACITY: usize = 1024; +/// Maximum drop-warning log lines per second per route. +const MAX_ERROR_LOG_RATE: u32 = 1; // Each input() call produces a TypedRoute that decodes its message type // and forwards it to the right Input's mpsc channel. @@ -22,15 +26,33 @@ struct TypedRoute { topic: String, decode: fn(&[u8]) -> io::Result, sender: mpsc::Sender, + drop_count: AtomicU64, + last_log: Mutex>, } impl Route for TypedRoute { fn try_dispatch(&self, data: &[u8]) { match (self.decode)(data) { - // If the input channel is full, the newest message is dropped. - Ok(msg) => { - let _ = self.sender.try_send(msg); - } + Ok(msg) => match self.sender.try_send(msg) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + let n = self.drop_count.fetch_add(1, Ordering::Relaxed) + 1; + let interval = Duration::from_secs(1) / MAX_ERROR_LOG_RATE; + let mut last = self.last_log.lock().unwrap(); + let now = Instant::now(); + if last + .map(|t| now.duration_since(t) >= interval) + .unwrap_or(true) + { + *last = Some(now); + eprintln!( + "dimos_module: input '{}' dropped {} message(s) — handler can't keep up (queue cap = {})", + self.topic, n, INPUT_CHANNEL_CAPACITY, + ); + } + } + Err(mpsc::error::TrySendError::Closed(_)) => {} + }, Err(e) => eprintln!("dimos_module: decode error on {}: {e}", self.topic), } } @@ -149,6 +171,8 @@ impl Builder { topic: topic.clone(), decode, sender: tx, + drop_count: AtomicU64::new(0), + last_log: Mutex::new(None), })); Input { topic, @@ -564,4 +588,24 @@ mod tests { fn ok_does_not_panic() { propagate_task_failure("recv", Ok(())); } + + #[test] + fn typed_route_logs_error_on_drop() { + let (tx, _rx) = mpsc::channel::>(1); + let route = TypedRoute { + topic: "/test".to_string(), + decode: |b| Ok(b.to_vec()), + sender: tx, + drop_count: AtomicU64::new(0), + last_log: Mutex::new(None), + }; + // Fill the queue, then force a drop. + route.try_dispatch(&[1u8]); + route.try_dispatch(&[1u8]); + assert_eq!(route.drop_count.load(Ordering::Relaxed), 1); + assert!( + route.last_log.lock().unwrap().is_some(), + "drop must trigger a log", + ); + } }