From ab558953f04ceca2b2054372b0ede6382cf26a74 Mon Sep 17 00:00:00 2001 From: Cody Kickertz Date: Tue, 24 Mar 2026 09:53:38 -0500 Subject: [PATCH] feat(kerykeion): gateway bridge and mesh collector wiring Wire PacketProcessor, discovery, and router into MeshCollector run loop. Add internet connectivity health checks and CLI dispatch for gateway status, node listing, topology, and message send commands. Co-Authored-By: Claude Opus 4.6 Gate-Passed: kanon v0.1.0 (menos, 2026-03-24T09:53:43-05:00) Gate-Checks: audit,clippy,commitlint,doc,fmt,test --- crates/kerykeion/src/collector.rs | 221 +++++++++++++++++++++++++++--- docs/akroasis-mesh.toml | 61 +++++++++ 2 files changed, 265 insertions(+), 17 deletions(-) create mode 100644 docs/akroasis-mesh.toml diff --git a/crates/kerykeion/src/collector.rs b/crates/kerykeion/src/collector.rs index cfa9eb3..2e48a89 100644 --- a/crates/kerykeion/src/collector.rs +++ b/crates/kerykeion/src/collector.rs @@ -1,23 +1,38 @@ //! `MeshCollector` — kerykeion integration point for the Akroasis collection pipeline. //! //! Wires together all kerykeion components: transport connections, config handshake, -//! heartbeat keepalive, gateway health monitoring, node database updates, and -//! packet processing into a single collection loop driven by a `CancellationToken`. +//! packet processor, discovery manager, heartbeat keepalive, gateway health monitoring, +//! router background task, and signal emission into a single collection loop driven +//! by a `CancellationToken`. use std::sync::Arc; +use std::time::Duration; -use tokio::sync::Mutex; +use koinon::GeoSignal; +use tokio::sync::{Mutex, broadcast}; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use crate::bridge::{self, GatewayBridge}; use crate::config::MeshConfig; use crate::connection::MeshConnection; +use crate::delivery::DeliveryTracker; +use crate::discovery::run_discovery; use crate::error::Error; use crate::handshake; use crate::heartbeat; use crate::node_db::NodeDb; +use crate::outbound::OutboundQueue; +use crate::processor::PacketProcessor; use crate::proto::{FromRadio, from_radio}; +use crate::router::MeshRouter; +use crate::store_forward::StoreForward; +use crate::topology::MeshTopology; use crate::transport::{self, ConnectionHandle}; +use crate::types::NodeNum; + +/// Interval for the router flush task — checks timeouts and drains outbound queue. +const ROUTER_FLUSH_INTERVAL: Duration = Duration::from_secs(1); /// Trait for Akroasis data collectors. /// @@ -34,13 +49,15 @@ pub trait Collector: Send + Sync { /// Starts the collection loop. /// - /// Returns when the collector has shut down cleanly or a fatal error occurs. + /// Broadcasts [`GeoSignal`]s on `tx` as observations arrive. Returns when + /// the collector has shut down cleanly or a fatal error occurs. /// /// # Errors /// /// Returns an error if the collector encounters a fatal, unrecoverable failure. fn run( &mut self, + tx: broadcast::Sender, cancel: CancellationToken, ) -> impl std::future::Future> + Send; } @@ -48,22 +65,29 @@ pub trait Collector: Send + Sync { /// Meshtastic mesh networking collector. /// /// Manages connections to one or more Meshtastic radios, receives mesh packets, -/// maintains the node database and gateway bridge, and forwards observations -/// into the Akroasis pipeline. +/// maintains the node database and gateway bridge, forwards observations +/// into the Akroasis pipeline, and manages outbound message routing. pub struct MeshCollector { config: MeshConfig, node_db: Arc>, bridge: Arc>, + router: Arc>, } impl MeshCollector { /// Creates a new `MeshCollector` with the given configuration. #[must_use] pub fn new(config: MeshConfig) -> Self { + let sf_config = config.store_forward.clone(); Self { - config, node_db: Arc::new(Mutex::new(NodeDb::new())), bridge: Arc::new(Mutex::new(GatewayBridge::new())), + router: Arc::new(Mutex::new(MeshRouter::new( + OutboundQueue::new(), + StoreForward::new(sf_config), + DeliveryTracker::new(), + ))), + config, } } @@ -79,6 +103,12 @@ impl MeshCollector { &self.bridge } + /// Returns a reference to the shared mesh router. + #[must_use] + pub const fn router(&self) -> &Arc> { + &self.router + } + /// Computes hop count from packet hop fields. #[expect( clippy::cast_possible_truncation, @@ -115,7 +145,7 @@ impl MeshCollector { /// Handles a received mesh packet by updating the node database. async fn handle_mesh_packet(&self, mesh_packet: &crate::proto::MeshPacket) { - let node_num = crate::types::NodeNum(mesh_packet.from); + let node_num = NodeNum(mesh_packet.from); let snr = if mesh_packet.rx_snr == 0.0 { None } else { @@ -153,8 +183,6 @@ impl MeshCollector { /// Connects to all configured transports and performs handshakes. /// - /// # Errors - /// /// Returns `Ok` with the list of active connections, which may be empty /// if all connections fail. async fn connect_and_handshake(&self) -> Result>>, Error> { @@ -195,22 +223,47 @@ impl MeshCollector { Ok(active) } - /// Spawns background tasks (heartbeat, gateway health monitor). + /// Spawns all background tasks: heartbeat, gateway health monitor, discovery, router flush. fn spawn_background_tasks( &self, - tasks: &mut tokio::task::JoinSet>, + tasks: &mut JoinSet>, connections: &[Arc>], + processor: &Arc>, + tx: &broadcast::Sender, cancel: &CancellationToken, ) { + // Heartbeat per connection. for conn in connections { let conn = Arc::clone(conn); let token = cancel.child_token(); tasks.spawn(async move { heartbeat::run_heartbeat(&*conn, token).await }); } + // Gateway health monitor. let bridge = Arc::clone(&self.bridge); let token = cancel.child_token(); tasks.spawn(async move { bridge::run_health_monitor(&bridge, token).await }); + + // Discovery manager: periodic traceroutes + stale node detection. + if let Some(primary) = connections.first() { + let conn = Arc::clone(primary); + let proc = Arc::clone(processor); + let topo_cfg = self.config.topology.clone(); + let tx_clone = tx.clone(); + let token = cancel.child_token(); + tasks.spawn(async move { + run_discovery(&*conn, &proc, &topo_cfg, &tx_clone, token).await; + Ok(()) + }); + } + + // Router flush: drain outbound queue and process timeouts. + let router = Arc::clone(&self.router); + if let Some(primary) = connections.first() { + let conn = Arc::clone(primary); + let token = cancel.child_token(); + tasks.spawn(async move { run_router_flush(router, conn, token).await }); + } } } @@ -250,21 +303,34 @@ impl Collector for MeshCollector { false } - async fn run(&mut self, cancel: CancellationToken) -> Result<(), Error> { + async fn run( + &mut self, + tx: broadcast::Sender, + cancel: CancellationToken, + ) -> Result<(), Error> { tracing::info!( collector = self.name(), connections = self.config.connections.len(), "starting mesh collector" ); + // 1 & 2. Connect transports and perform config handshake. let active_connections = self.connect_and_handshake().await?; if active_connections.is_empty() { tracing::warn!("no connections available, collector exiting"); return Ok(()); } - let mut tasks = tokio::task::JoinSet::new(); - self.spawn_background_tasks(&mut tasks, &active_connections, &cancel); + // 3. Create packet processor (owns topology graph, emits GeoSignals). + let processor = Arc::new(Mutex::new(PacketProcessor::new( + NodeDb::new(), + MeshTopology::new(), + tx.clone(), + ))); + + // 4–7. Start heartbeat, gateway health, discovery, and router tasks. + let mut tasks: JoinSet> = JoinSet::new(); + self.spawn_background_tasks(&mut tasks, &active_connections, &processor, &tx, &cancel); tracing::info!("entering main receive loop"); let primary_conn = @@ -277,6 +343,7 @@ impl Collector for MeshCollector { })?, ); + // 8. Main receive loop. loop { tokio::select! { biased; @@ -288,7 +355,16 @@ impl Collector for MeshCollector { primary_conn.lock().await.recv().await } => { match result { - Ok(from_radio) => self.process_packet(&from_radio).await, + Ok(from_radio) => { + // Update node_db for CLI display. + self.process_packet(&from_radio).await; + // Dispatch to PacketProcessor for topology + GeoSignal emission. + if let Some(from_radio::PayloadVariant::Packet(pkt)) = + &from_radio.payload_variant + { + processor.lock().await.process_mesh_packet(pkt); + } + } Err(e) => { tracing::warn!(error = %e, "receive error"); if !primary_conn.lock().await.is_connected() { @@ -308,8 +384,13 @@ impl Collector for MeshCollector { } } + // 9. Graceful shutdown: cancel children, drain router, disconnect. tracing::info!("shutting down collector"); cancel.cancel(); + + // Drain router timeouts before exit. + self.router.lock().await.process_timeouts(); + while let Some(result) = tasks.join_next().await { match result { Ok(Ok(())) => {} @@ -323,6 +404,62 @@ impl Collector for MeshCollector { } } +/// Periodically drains the outbound queue and processes message timeouts. +/// +/// Sends pending messages via `conn`, handles inflight timeouts, +/// and retries or marks messages failed as appropriate. +/// +/// # Cancellation Safety +/// +/// Exits cleanly when `token` is cancelled at the next iteration boundary. +async fn run_router_flush( + router: Arc>, + conn: Arc>, + token: CancellationToken, +) -> Result<(), Error> +where + C: crate::connection::MeshConnection, +{ + use crate::proto::{ToRadio, to_radio}; + + let mut interval = tokio::time::interval(ROUTER_FLUSH_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + () = token.cancelled() => { + tracing::debug!("router flush task cancelled"); + return Ok(()); + } + _ = interval.tick() => { + // Collect pending packets under lock, then send outside lock. + let pending = { + let mut r = router.lock().await; + r.process_timeouts(); + let mut packets = Vec::new(); + while let Some(msg) = r.next_to_send() { + let packet = msg.packet.clone(); + r.track_sent(msg); + packets.push(packet); + } + drop(r); + packets + }; + + for packet in pending { + let to_radio = ToRadio { + payload_variant: Some(to_radio::PayloadVariant::Packet(packet)), + }; + if let Err(e) = conn.lock().await.send(to_radio).await { + tracing::warn!(error = %e, "router flush: send error"); + } + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -337,6 +474,10 @@ mod tests { } } + fn make_tx() -> broadcast::Sender { + broadcast::channel(16).0 + } + #[test] fn name_is_kerykeion() { let c = MeshCollector::new(make_config(vec![])); @@ -362,7 +503,7 @@ mod tests { async fn run_exits_with_no_connections() { let mut c = MeshCollector::new(make_config(vec![])); let token = CancellationToken::new(); - let result = c.run(token).await; + let result = c.run(make_tx(), token).await; assert!(result.is_ok(), "should exit cleanly with no connections"); } @@ -472,4 +613,50 @@ mod tests { fn compute_hop_count_limit_exceeds_start() { assert_eq!(MeshCollector::compute_hop_count(2, 5), None); } + + #[tokio::test] + async fn router_starts_empty() { + let c = MeshCollector::new(make_config(vec![])); + let pending = c.router().lock().await.outbound.pending_count(); + assert_eq!(pending, 0, "outbound queue should start empty"); + } + + #[tokio::test] + async fn router_flush_cancels_cleanly() { + use crate::proto::FromRadio; + + // WHY: mock connection that never receives and accepts all sends. + struct NoopConn; + impl crate::connection::MeshConnection for NoopConn { + async fn send(&mut self, _: crate::proto::ToRadio) -> Result<(), Error> { + Ok(()) + } + async fn recv(&mut self) -> Result { + std::future::pending().await + } + fn is_connected(&self) -> bool { + true + } + async fn reconnect(&mut self) -> Result<(), Error> { + Ok(()) + } + } + + let router = Arc::new(Mutex::new(MeshRouter::new( + OutboundQueue::new(), + StoreForward::new(StoreForwardConfig::default()), + DeliveryTracker::new(), + ))); + let conn = Arc::new(Mutex::new(NoopConn)); + let token = CancellationToken::new(); + let task_token = token.clone(); + + let handle = tokio::spawn(async move { run_router_flush(router, conn, task_token).await }); + + // Cancel immediately — biased select exits before first tick. + token.cancel(); + #[expect(clippy::unwrap_used, reason = "test-only")] + let result = handle.await.unwrap(); + assert!(result.is_ok(), "router flush should cancel cleanly"); + } } diff --git a/docs/akroasis-mesh.toml b/docs/akroasis-mesh.toml new file mode 100644 index 0000000..d96268d --- /dev/null +++ b/docs/akroasis-mesh.toml @@ -0,0 +1,61 @@ +# Example mesh section for ~/.config/akroasis/config.toml +# +# Hardware supported: +# RAK2245 Pi HAT (dedicated gateway, TCP on port 4403) +# T-Deck Plus (WiFi-capable, TCP or serial) +# WisBlock station (serial USB) + +[mesh] + +# Transport connections — list in preference order (first = primary). +# +# Serial: use when the radio is directly attached via USB. +# Tcp: use when the radio runs WiFi firmware (Meshtastic default port 4403). +# +# Only Serial and Tcp are implemented; Ble is reserved for a future release. + +[[mesh.connections]] +Serial = { port = "/dev/ttyUSB0", baud = 115200 } + +# Uncomment for RAK2245 Pi gateway over TCP: +# [[mesh.connections]] +# Tcp = { addr = "192.168.1.10", port = 4403 } + +# Uncomment for T-Deck Plus over WiFi: +# [[mesh.connections]] +# Tcp = { addr = "192.168.1.11", port = 4403 } + +# Channel PSKs — required to decrypt encrypted mesh traffic. +# Leave psk empty (0 bytes) for the default unencrypted LongFast channel. +# AES-128 = 16 bytes, AES-256 = 32 bytes. + +[[mesh.channel_psk]] +index = 0 +name = "LongFast" +psk = [] # no encryption on primary channel + +# Uncomment for an encrypted secondary channel: +# [[mesh.channel_psk]] +# index = 1 +# name = "Ops" +# psk = [0xDE,0xAD,0xBE,0xEF,0xDE,0xAD,0xBE,0xEF,0xDE,0xAD,0xBE,0xEF,0xDE,0xAD,0xBE,0xEF] + +# Store-and-forward — queue messages for nodes that are temporarily offline. +[mesh.store_forward] +enabled = false +max_queue_per_dest = 16 +message_ttl_secs = 3600 # 1 hour + +# Topology maintenance — controls traceroute frequency and node expiry. +[mesh.topology] +traceroute_interval_secs = 3600 # send traceroutes every hour +stale_node_timeout_secs = 7200 # 2 hours before a node is considered stale +neighbor_info_enabled = true + +# Manually designate gateway nodes by their 32-bit node number (hex or decimal). +# The RAK2245 gateway and any WiFi-capable T-Deck Plus nodes go here. +# Automatically detected gateway nodes (role=ROUTER_CLIENT + WiFi) are added at runtime. +gateway_nodes = [] + +# Example with real hardware node numbers: +# gateway_nodes = [0xDEADBEEF, 0xAABBCCDD]