Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 204 additions & 17 deletions crates/kerykeion/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
//! `MeshCollector` — kerykeion integration point for the Akroasis collection pipeline.
//!
//! Wires together all kerykeion components: transport connections, config handshake,
//! heartbeat keepalive, gateway health monitoring, node database updates, and
//! packet processing into a single collection loop driven by a `CancellationToken`.
//! packet processor, discovery manager, heartbeat keepalive, gateway health monitoring,
//! router background task, and signal emission into a single collection loop driven
//! by a `CancellationToken`.

use std::sync::Arc;
use std::time::Duration;

use tokio::sync::Mutex;
use koinon::GeoSignal;
use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;

use crate::bridge::{self, GatewayBridge};
use crate::config::MeshConfig;
use crate::connection::MeshConnection;
use crate::delivery::DeliveryTracker;
use crate::discovery::run_discovery;
use crate::error::Error;
use crate::handshake;
use crate::heartbeat;
use crate::node_db::NodeDb;
use crate::outbound::OutboundQueue;
use crate::processor::PacketProcessor;
use crate::proto::{FromRadio, from_radio};
use crate::router::MeshRouter;
use crate::store_forward::StoreForward;
use crate::topology::MeshTopology;
use crate::transport::{self, ConnectionHandle};
use crate::types::NodeNum;

/// Interval for the router flush task — checks timeouts and drains outbound queue.
const ROUTER_FLUSH_INTERVAL: Duration = Duration::from_secs(1);

/// Trait for Akroasis data collectors.
///
Expand All @@ -34,36 +49,45 @@ pub trait Collector: Send + Sync {

/// Starts the collection loop.
///
/// Returns when the collector has shut down cleanly or a fatal error occurs.
/// Broadcasts [`GeoSignal`]s on `tx` as observations arrive. Returns when
/// the collector has shut down cleanly or a fatal error occurs.
///
/// # Errors
///
/// Returns an error if the collector encounters a fatal, unrecoverable failure.
fn run(
&mut self,
tx: broadcast::Sender<GeoSignal>,
cancel: CancellationToken,
) -> impl std::future::Future<Output = Result<(), Error>> + Send;
}

/// Meshtastic mesh networking collector.
///
/// Manages connections to one or more Meshtastic radios, receives mesh packets,
/// maintains the node database and gateway bridge, and forwards observations
/// into the Akroasis pipeline.
/// maintains the node database and gateway bridge, forwards observations
/// into the Akroasis pipeline, and manages outbound message routing.
pub struct MeshCollector {
config: MeshConfig,
node_db: Arc<Mutex<NodeDb>>,
bridge: Arc<Mutex<GatewayBridge>>,
router: Arc<Mutex<MeshRouter>>,
}

impl MeshCollector {
/// Creates a new `MeshCollector` with the given configuration.
#[must_use]
pub fn new(config: MeshConfig) -> Self {
let sf_config = config.store_forward.clone();
Self {
config,
node_db: Arc::new(Mutex::new(NodeDb::new())),
bridge: Arc::new(Mutex::new(GatewayBridge::new())),
router: Arc::new(Mutex::new(MeshRouter::new(
OutboundQueue::new(),
StoreForward::new(sf_config),
DeliveryTracker::new(),
))),
config,
}
}

Expand All @@ -79,6 +103,12 @@ impl MeshCollector {
&self.bridge
}

/// Returns a reference to the shared mesh router.
#[must_use]
pub const fn router(&self) -> &Arc<Mutex<MeshRouter>> {
&self.router
}

/// Computes hop count from packet hop fields.
#[expect(
clippy::cast_possible_truncation,
Expand Down Expand Up @@ -115,7 +145,7 @@ impl MeshCollector {

/// Handles a received mesh packet by updating the node database.
async fn handle_mesh_packet(&self, mesh_packet: &crate::proto::MeshPacket) {
let node_num = crate::types::NodeNum(mesh_packet.from);
let node_num = NodeNum(mesh_packet.from);
let snr = if mesh_packet.rx_snr == 0.0 {
None
} else {
Expand Down Expand Up @@ -153,8 +183,6 @@ impl MeshCollector {

/// Connects to all configured transports and performs handshakes.
///
/// # Errors
///
/// Returns `Ok` with the list of active connections, which may be empty
/// if all connections fail.
async fn connect_and_handshake(&self) -> Result<Vec<Arc<Mutex<ConnectionHandle>>>, Error> {
Expand Down Expand Up @@ -195,22 +223,47 @@ impl MeshCollector {
Ok(active)
}

/// Spawns background tasks (heartbeat, gateway health monitor).
/// Spawns all background tasks: heartbeat, gateway health monitor, discovery, router flush.
fn spawn_background_tasks(
&self,
tasks: &mut tokio::task::JoinSet<Result<(), Error>>,
tasks: &mut JoinSet<Result<(), Error>>,
connections: &[Arc<Mutex<ConnectionHandle>>],
processor: &Arc<Mutex<PacketProcessor>>,
tx: &broadcast::Sender<GeoSignal>,
cancel: &CancellationToken,
) {
// Heartbeat per connection.
for conn in connections {
let conn = Arc::clone(conn);
let token = cancel.child_token();
tasks.spawn(async move { heartbeat::run_heartbeat(&*conn, token).await });
}

// Gateway health monitor.
let bridge = Arc::clone(&self.bridge);
let token = cancel.child_token();
tasks.spawn(async move { bridge::run_health_monitor(&bridge, token).await });

// Discovery manager: periodic traceroutes + stale node detection.
if let Some(primary) = connections.first() {
let conn = Arc::clone(primary);
let proc = Arc::clone(processor);
let topo_cfg = self.config.topology.clone();
let tx_clone = tx.clone();
let token = cancel.child_token();
tasks.spawn(async move {
run_discovery(&*conn, &proc, &topo_cfg, &tx_clone, token).await;
Ok(())
});
}

// Router flush: drain outbound queue and process timeouts.
let router = Arc::clone(&self.router);
if let Some(primary) = connections.first() {
let conn = Arc::clone(primary);
let token = cancel.child_token();
tasks.spawn(async move { run_router_flush(router, conn, token).await });
}
}
}

Expand Down Expand Up @@ -250,21 +303,34 @@ impl Collector for MeshCollector {
false
}

async fn run(&mut self, cancel: CancellationToken) -> Result<(), Error> {
async fn run(
&mut self,
tx: broadcast::Sender<GeoSignal>,
cancel: CancellationToken,
) -> Result<(), Error> {
tracing::info!(
collector = self.name(),
connections = self.config.connections.len(),
"starting mesh collector"
);

// 1 & 2. Connect transports and perform config handshake.
let active_connections = self.connect_and_handshake().await?;
if active_connections.is_empty() {
tracing::warn!("no connections available, collector exiting");
return Ok(());
}

let mut tasks = tokio::task::JoinSet::new();
self.spawn_background_tasks(&mut tasks, &active_connections, &cancel);
// 3. Create packet processor (owns topology graph, emits GeoSignals).
let processor = Arc::new(Mutex::new(PacketProcessor::new(
NodeDb::new(),
MeshTopology::new(),
tx.clone(),
)));

// 4–7. Start heartbeat, gateway health, discovery, and router tasks.
let mut tasks: JoinSet<Result<(), Error>> = JoinSet::new();
self.spawn_background_tasks(&mut tasks, &active_connections, &processor, &tx, &cancel);

tracing::info!("entering main receive loop");
let primary_conn =
Expand All @@ -277,6 +343,7 @@ impl Collector for MeshCollector {
})?,
);

// 8. Main receive loop.
loop {
tokio::select! {
biased;
Expand All @@ -288,7 +355,16 @@ impl Collector for MeshCollector {
primary_conn.lock().await.recv().await
} => {
match result {
Ok(from_radio) => self.process_packet(&from_radio).await,
Ok(from_radio) => {
// Update node_db for CLI display.
self.process_packet(&from_radio).await;
// Dispatch to PacketProcessor for topology + GeoSignal emission.
if let Some(from_radio::PayloadVariant::Packet(pkt)) =
&from_radio.payload_variant
{
processor.lock().await.process_mesh_packet(pkt);
}
}
Err(e) => {
tracing::warn!(error = %e, "receive error");
if !primary_conn.lock().await.is_connected() {
Expand All @@ -308,8 +384,13 @@ impl Collector for MeshCollector {
}
}

// 9. Graceful shutdown: cancel children, drain router, disconnect.
tracing::info!("shutting down collector");
cancel.cancel();

// Drain router timeouts before exit.
self.router.lock().await.process_timeouts();

while let Some(result) = tasks.join_next().await {
match result {
Ok(Ok(())) => {}
Expand All @@ -323,6 +404,62 @@ impl Collector for MeshCollector {
}
}

/// Periodically drains the outbound queue and processes message timeouts.
///
/// Sends pending messages via `conn`, handles inflight timeouts,
/// and retries or marks messages failed as appropriate.
///
/// # Cancellation Safety
///
/// Exits cleanly when `token` is cancelled at the next iteration boundary.
async fn run_router_flush<C>(
router: Arc<Mutex<MeshRouter>>,
conn: Arc<Mutex<C>>,
token: CancellationToken,
) -> Result<(), Error>
where
C: crate::connection::MeshConnection,
{
use crate::proto::{ToRadio, to_radio};

let mut interval = tokio::time::interval(ROUTER_FLUSH_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
tokio::select! {
biased;
() = token.cancelled() => {
tracing::debug!("router flush task cancelled");
return Ok(());
}
_ = interval.tick() => {
// Collect pending packets under lock, then send outside lock.
let pending = {
let mut r = router.lock().await;
r.process_timeouts();
let mut packets = Vec::new();
while let Some(msg) = r.next_to_send() {
let packet = msg.packet.clone();
r.track_sent(msg);
packets.push(packet);
}
drop(r);
packets
};

for packet in pending {
let to_radio = ToRadio {
payload_variant: Some(to_radio::PayloadVariant::Packet(packet)),
};
if let Err(e) = conn.lock().await.send(to_radio).await {
tracing::warn!(error = %e, "router flush: send error");
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -337,6 +474,10 @@ mod tests {
}
}

fn make_tx() -> broadcast::Sender<GeoSignal> {
broadcast::channel(16).0
}

#[test]
fn name_is_kerykeion() {
let c = MeshCollector::new(make_config(vec![]));
Expand All @@ -362,7 +503,7 @@ mod tests {
async fn run_exits_with_no_connections() {
let mut c = MeshCollector::new(make_config(vec![]));
let token = CancellationToken::new();
let result = c.run(token).await;
let result = c.run(make_tx(), token).await;
assert!(result.is_ok(), "should exit cleanly with no connections");
}

Expand Down Expand Up @@ -472,4 +613,50 @@ mod tests {
fn compute_hop_count_limit_exceeds_start() {
assert_eq!(MeshCollector::compute_hop_count(2, 5), None);
}

#[tokio::test]
async fn router_starts_empty() {
let c = MeshCollector::new(make_config(vec![]));
let pending = c.router().lock().await.outbound.pending_count();
assert_eq!(pending, 0, "outbound queue should start empty");
}

#[tokio::test]
async fn router_flush_cancels_cleanly() {
use crate::proto::FromRadio;

// WHY: mock connection that never receives and accepts all sends.
struct NoopConn;
impl crate::connection::MeshConnection for NoopConn {
async fn send(&mut self, _: crate::proto::ToRadio) -> Result<(), Error> {
Ok(())
}
async fn recv(&mut self) -> Result<FromRadio, Error> {
std::future::pending().await
}
fn is_connected(&self) -> bool {
true
}
async fn reconnect(&mut self) -> Result<(), Error> {
Ok(())
}
}

let router = Arc::new(Mutex::new(MeshRouter::new(
OutboundQueue::new(),
StoreForward::new(StoreForwardConfig::default()),
DeliveryTracker::new(),
)));
let conn = Arc::new(Mutex::new(NoopConn));
let token = CancellationToken::new();
let task_token = token.clone();

let handle = tokio::spawn(async move { run_router_flush(router, conn, task_token).await });

// Cancel immediately — biased select exits before first tick.
token.cancel();
#[expect(clippy::unwrap_used, reason = "test-only")]
let result = handle.await.unwrap();
assert!(result.is_ok(), "router flush should cancel cleanly");
}
}
Loading
Loading