diff --git a/crates/kerykeion/src/collector.rs b/crates/kerykeion/src/collector.rs index cfa9eb3..2e48a89 100644 --- a/crates/kerykeion/src/collector.rs +++ b/crates/kerykeion/src/collector.rs @@ -1,23 +1,38 @@ //! `MeshCollector` — kerykeion integration point for the Akroasis collection pipeline. //! //! Wires together all kerykeion components: transport connections, config handshake, -//! heartbeat keepalive, gateway health monitoring, node database updates, and -//! packet processing into a single collection loop driven by a `CancellationToken`. +//! packet processor, discovery manager, heartbeat keepalive, gateway health monitoring, +//! router background task, and signal emission into a single collection loop driven +//! by a `CancellationToken`. use std::sync::Arc; +use std::time::Duration; -use tokio::sync::Mutex; +use koinon::GeoSignal; +use tokio::sync::{Mutex, broadcast}; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use crate::bridge::{self, GatewayBridge}; use crate::config::MeshConfig; use crate::connection::MeshConnection; +use crate::delivery::DeliveryTracker; +use crate::discovery::run_discovery; use crate::error::Error; use crate::handshake; use crate::heartbeat; use crate::node_db::NodeDb; +use crate::outbound::OutboundQueue; +use crate::processor::PacketProcessor; use crate::proto::{FromRadio, from_radio}; +use crate::router::MeshRouter; +use crate::store_forward::StoreForward; +use crate::topology::MeshTopology; use crate::transport::{self, ConnectionHandle}; +use crate::types::NodeNum; + +/// Interval for the router flush task — checks timeouts and drains outbound queue. +const ROUTER_FLUSH_INTERVAL: Duration = Duration::from_secs(1); /// Trait for Akroasis data collectors. /// @@ -34,13 +49,15 @@ pub trait Collector: Send + Sync { /// Starts the collection loop. /// - /// Returns when the collector has shut down cleanly or a fatal error occurs. + /// Broadcasts [`GeoSignal`]s on `tx` as observations arrive. Returns when + /// the collector has shut down cleanly or a fatal error occurs. /// /// # Errors /// /// Returns an error if the collector encounters a fatal, unrecoverable failure. fn run( &mut self, + tx: broadcast::Sender, cancel: CancellationToken, ) -> impl std::future::Future> + Send; } @@ -48,22 +65,29 @@ pub trait Collector: Send + Sync { /// Meshtastic mesh networking collector. /// /// Manages connections to one or more Meshtastic radios, receives mesh packets, -/// maintains the node database and gateway bridge, and forwards observations -/// into the Akroasis pipeline. +/// maintains the node database and gateway bridge, forwards observations +/// into the Akroasis pipeline, and manages outbound message routing. pub struct MeshCollector { config: MeshConfig, node_db: Arc>, bridge: Arc>, + router: Arc>, } impl MeshCollector { /// Creates a new `MeshCollector` with the given configuration. #[must_use] pub fn new(config: MeshConfig) -> Self { + let sf_config = config.store_forward.clone(); Self { - config, node_db: Arc::new(Mutex::new(NodeDb::new())), bridge: Arc::new(Mutex::new(GatewayBridge::new())), + router: Arc::new(Mutex::new(MeshRouter::new( + OutboundQueue::new(), + StoreForward::new(sf_config), + DeliveryTracker::new(), + ))), + config, } } @@ -79,6 +103,12 @@ impl MeshCollector { &self.bridge } + /// Returns a reference to the shared mesh router. + #[must_use] + pub const fn router(&self) -> &Arc> { + &self.router + } + /// Computes hop count from packet hop fields. #[expect( clippy::cast_possible_truncation, @@ -115,7 +145,7 @@ impl MeshCollector { /// Handles a received mesh packet by updating the node database. async fn handle_mesh_packet(&self, mesh_packet: &crate::proto::MeshPacket) { - let node_num = crate::types::NodeNum(mesh_packet.from); + let node_num = NodeNum(mesh_packet.from); let snr = if mesh_packet.rx_snr == 0.0 { None } else { @@ -153,8 +183,6 @@ impl MeshCollector { /// Connects to all configured transports and performs handshakes. /// - /// # Errors - /// /// Returns `Ok` with the list of active connections, which may be empty /// if all connections fail. async fn connect_and_handshake(&self) -> Result>>, Error> { @@ -195,22 +223,47 @@ impl MeshCollector { Ok(active) } - /// Spawns background tasks (heartbeat, gateway health monitor). + /// Spawns all background tasks: heartbeat, gateway health monitor, discovery, router flush. fn spawn_background_tasks( &self, - tasks: &mut tokio::task::JoinSet>, + tasks: &mut JoinSet>, connections: &[Arc>], + processor: &Arc>, + tx: &broadcast::Sender, cancel: &CancellationToken, ) { + // Heartbeat per connection. for conn in connections { let conn = Arc::clone(conn); let token = cancel.child_token(); tasks.spawn(async move { heartbeat::run_heartbeat(&*conn, token).await }); } + // Gateway health monitor. let bridge = Arc::clone(&self.bridge); let token = cancel.child_token(); tasks.spawn(async move { bridge::run_health_monitor(&bridge, token).await }); + + // Discovery manager: periodic traceroutes + stale node detection. + if let Some(primary) = connections.first() { + let conn = Arc::clone(primary); + let proc = Arc::clone(processor); + let topo_cfg = self.config.topology.clone(); + let tx_clone = tx.clone(); + let token = cancel.child_token(); + tasks.spawn(async move { + run_discovery(&*conn, &proc, &topo_cfg, &tx_clone, token).await; + Ok(()) + }); + } + + // Router flush: drain outbound queue and process timeouts. + let router = Arc::clone(&self.router); + if let Some(primary) = connections.first() { + let conn = Arc::clone(primary); + let token = cancel.child_token(); + tasks.spawn(async move { run_router_flush(router, conn, token).await }); + } } } @@ -250,21 +303,34 @@ impl Collector for MeshCollector { false } - async fn run(&mut self, cancel: CancellationToken) -> Result<(), Error> { + async fn run( + &mut self, + tx: broadcast::Sender, + cancel: CancellationToken, + ) -> Result<(), Error> { tracing::info!( collector = self.name(), connections = self.config.connections.len(), "starting mesh collector" ); + // 1 & 2. Connect transports and perform config handshake. let active_connections = self.connect_and_handshake().await?; if active_connections.is_empty() { tracing::warn!("no connections available, collector exiting"); return Ok(()); } - let mut tasks = tokio::task::JoinSet::new(); - self.spawn_background_tasks(&mut tasks, &active_connections, &cancel); + // 3. Create packet processor (owns topology graph, emits GeoSignals). + let processor = Arc::new(Mutex::new(PacketProcessor::new( + NodeDb::new(), + MeshTopology::new(), + tx.clone(), + ))); + + // 4–7. Start heartbeat, gateway health, discovery, and router tasks. + let mut tasks: JoinSet> = JoinSet::new(); + self.spawn_background_tasks(&mut tasks, &active_connections, &processor, &tx, &cancel); tracing::info!("entering main receive loop"); let primary_conn = @@ -277,6 +343,7 @@ impl Collector for MeshCollector { })?, ); + // 8. Main receive loop. loop { tokio::select! { biased; @@ -288,7 +355,16 @@ impl Collector for MeshCollector { primary_conn.lock().await.recv().await } => { match result { - Ok(from_radio) => self.process_packet(&from_radio).await, + Ok(from_radio) => { + // Update node_db for CLI display. + self.process_packet(&from_radio).await; + // Dispatch to PacketProcessor for topology + GeoSignal emission. + if let Some(from_radio::PayloadVariant::Packet(pkt)) = + &from_radio.payload_variant + { + processor.lock().await.process_mesh_packet(pkt); + } + } Err(e) => { tracing::warn!(error = %e, "receive error"); if !primary_conn.lock().await.is_connected() { @@ -308,8 +384,13 @@ impl Collector for MeshCollector { } } + // 9. Graceful shutdown: cancel children, drain router, disconnect. tracing::info!("shutting down collector"); cancel.cancel(); + + // Drain router timeouts before exit. + self.router.lock().await.process_timeouts(); + while let Some(result) = tasks.join_next().await { match result { Ok(Ok(())) => {} @@ -323,6 +404,62 @@ impl Collector for MeshCollector { } } +/// Periodically drains the outbound queue and processes message timeouts. +/// +/// Sends pending messages via `conn`, handles inflight timeouts, +/// and retries or marks messages failed as appropriate. +/// +/// # Cancellation Safety +/// +/// Exits cleanly when `token` is cancelled at the next iteration boundary. +async fn run_router_flush( + router: Arc>, + conn: Arc>, + token: CancellationToken, +) -> Result<(), Error> +where + C: crate::connection::MeshConnection, +{ + use crate::proto::{ToRadio, to_radio}; + + let mut interval = tokio::time::interval(ROUTER_FLUSH_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + () = token.cancelled() => { + tracing::debug!("router flush task cancelled"); + return Ok(()); + } + _ = interval.tick() => { + // Collect pending packets under lock, then send outside lock. + let pending = { + let mut r = router.lock().await; + r.process_timeouts(); + let mut packets = Vec::new(); + while let Some(msg) = r.next_to_send() { + let packet = msg.packet.clone(); + r.track_sent(msg); + packets.push(packet); + } + drop(r); + packets + }; + + for packet in pending { + let to_radio = ToRadio { + payload_variant: Some(to_radio::PayloadVariant::Packet(packet)), + }; + if let Err(e) = conn.lock().await.send(to_radio).await { + tracing::warn!(error = %e, "router flush: send error"); + } + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -337,6 +474,10 @@ mod tests { } } + fn make_tx() -> broadcast::Sender { + broadcast::channel(16).0 + } + #[test] fn name_is_kerykeion() { let c = MeshCollector::new(make_config(vec![])); @@ -362,7 +503,7 @@ mod tests { async fn run_exits_with_no_connections() { let mut c = MeshCollector::new(make_config(vec![])); let token = CancellationToken::new(); - let result = c.run(token).await; + let result = c.run(make_tx(), token).await; assert!(result.is_ok(), "should exit cleanly with no connections"); } @@ -472,4 +613,50 @@ mod tests { fn compute_hop_count_limit_exceeds_start() { assert_eq!(MeshCollector::compute_hop_count(2, 5), None); } + + #[tokio::test] + async fn router_starts_empty() { + let c = MeshCollector::new(make_config(vec![])); + let pending = c.router().lock().await.outbound.pending_count(); + assert_eq!(pending, 0, "outbound queue should start empty"); + } + + #[tokio::test] + async fn router_flush_cancels_cleanly() { + use crate::proto::FromRadio; + + // WHY: mock connection that never receives and accepts all sends. + struct NoopConn; + impl crate::connection::MeshConnection for NoopConn { + async fn send(&mut self, _: crate::proto::ToRadio) -> Result<(), Error> { + Ok(()) + } + async fn recv(&mut self) -> Result { + std::future::pending().await + } + fn is_connected(&self) -> bool { + true + } + async fn reconnect(&mut self) -> Result<(), Error> { + Ok(()) + } + } + + let router = Arc::new(Mutex::new(MeshRouter::new( + OutboundQueue::new(), + StoreForward::new(StoreForwardConfig::default()), + DeliveryTracker::new(), + ))); + let conn = Arc::new(Mutex::new(NoopConn)); + let token = CancellationToken::new(); + let task_token = token.clone(); + + let handle = tokio::spawn(async move { run_router_flush(router, conn, task_token).await }); + + // Cancel immediately — biased select exits before first tick. + token.cancel(); + #[expect(clippy::unwrap_used, reason = "test-only")] + let result = handle.await.unwrap(); + assert!(result.is_ok(), "router flush should cancel cleanly"); + } } diff --git a/docs/akroasis-mesh.toml b/docs/akroasis-mesh.toml new file mode 100644 index 0000000..d96268d --- /dev/null +++ b/docs/akroasis-mesh.toml @@ -0,0 +1,61 @@ +# Example mesh section for ~/.config/akroasis/config.toml +# +# Hardware supported: +# RAK2245 Pi HAT (dedicated gateway, TCP on port 4403) +# T-Deck Plus (WiFi-capable, TCP or serial) +# WisBlock station (serial USB) + +[mesh] + +# Transport connections — list in preference order (first = primary). +# +# Serial: use when the radio is directly attached via USB. +# Tcp: use when the radio runs WiFi firmware (Meshtastic default port 4403). +# +# Only Serial and Tcp are implemented; Ble is reserved for a future release. + +[[mesh.connections]] +Serial = { port = "/dev/ttyUSB0", baud = 115200 } + +# Uncomment for RAK2245 Pi gateway over TCP: +# [[mesh.connections]] +# Tcp = { addr = "192.168.1.10", port = 4403 } + +# Uncomment for T-Deck Plus over WiFi: +# [[mesh.connections]] +# Tcp = { addr = "192.168.1.11", port = 4403 } + +# Channel PSKs — required to decrypt encrypted mesh traffic. +# Leave psk empty (0 bytes) for the default unencrypted LongFast channel. +# AES-128 = 16 bytes, AES-256 = 32 bytes. + +[[mesh.channel_psk]] +index = 0 +name = "LongFast" +psk = [] # no encryption on primary channel + +# Uncomment for an encrypted secondary channel: +# [[mesh.channel_psk]] +# index = 1 +# name = "Ops" +# psk = [0xDE,0xAD,0xBE,0xEF,0xDE,0xAD,0xBE,0xEF,0xDE,0xAD,0xBE,0xEF,0xDE,0xAD,0xBE,0xEF] + +# Store-and-forward — queue messages for nodes that are temporarily offline. +[mesh.store_forward] +enabled = false +max_queue_per_dest = 16 +message_ttl_secs = 3600 # 1 hour + +# Topology maintenance — controls traceroute frequency and node expiry. +[mesh.topology] +traceroute_interval_secs = 3600 # send traceroutes every hour +stale_node_timeout_secs = 7200 # 2 hours before a node is considered stale +neighbor_info_enabled = true + +# Manually designate gateway nodes by their 32-bit node number (hex or decimal). +# The RAK2245 gateway and any WiFi-capable T-Deck Plus nodes go here. +# Automatically detected gateway nodes (role=ROUTER_CLIENT + WiFi) are added at runtime. +gateway_nodes = [] + +# Example with real hardware node numbers: +# gateway_nodes = [0xDEADBEEF, 0xAABBCCDD]