Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aimdb-knx-connector/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **Spec-conformant TUNNELING_REQUEST retransmission — `TunnelConfig::ack_retransmits` (036 W4, default `1`).** When a tracked outbound telegram's ACK does not arrive within `ack_timeout_ms`, the engine now retransmits the byte-identical frame (same sequence counter, buffered in the pending-ACK slot per KNXnet/IP 3.8.4) and, when the repeat also goes unanswered, reports `Action::AckTimeout` **and tears the connection down** — so subsequent commands queue for the re-handshake instead of being sent into a dead tunnel. Hardware-bench evidence motivating this: ten button-press writes issued during a link outage's heartbeat-detection window (up to ~65 s) were silently lost with only warnings; with retransmission the loss window shrinks to ~2× `ack_timeout_ms`. `ack_retransmits: 0` restores the previous expire-and-warn behavior (no retransmit, no disconnect, no frame buffering — though the 16-slot frame capacity, ~4.5 KiB, is statically reserved either way on `heapless`). The retransmit delay is `ack_timeout_ms` (default 3 s, the constant both pre-engine implementations used); set it to `1_000` for strict spec timing. Covered by engine unit tests and a fake-gateway test that drops the first ACK and asserts the identical repeat.

### Fixed

- **Heartbeat-response liveness — a dead send path or expired gateway channel now reconnects (review follow-up to #135).** The engine tracks each CONNECTIONSTATE_REQUEST and drops the connection when the gateway's CONNECTIONSTATE_RESPONSE doesn't arrive within the new `TunnelConfig::heartbeat_response_timeout_ms` (default 10 s, the KNX spec timeout) or reports a non-zero status (e.g. the gateway expired the channel during an outage). This restores the old tokio client's recovery from silently-failing sends — the recv path of an unconnected UDP socket never errors, so without it a route flap left the tunnel `Connected` forever with a stale channel id — and adds genuine liveness detection on both runtimes.
Expand Down
83 changes: 83 additions & 0 deletions aimdb-knx-connector/src/tokio_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,89 @@ mod tests {
frame
}

/// TUNNELING_ACK from the gateway: header + connection header.
fn gateway_ack(channel_id: u8, seq: u8) -> Vec<u8> {
vec![
0x06, 0x10, 0x04, 0x21, 0x00, 0x0A, // header, total len 10
0x04, channel_id, seq, 0x00, // connection header, status OK
]
}

/// W4: the gateway drops the first ACK; the client retransmits the
/// byte-identical TUNNELING_REQUEST (same sequence counter, KNXnet/IP
/// 3.8.4) after the ACK timeout, and the tunnel survives once the repeat
/// is ACKed. Real-time test: waits out the 3 s default ACK timeout.
#[tokio::test]
async fn dropped_ack_triggers_identical_retransmit() {
let gateway = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let gateway_port = gateway.local_addr().unwrap().port();

let (command_tx, mut telegram_rx, connection_future) =
KnxConnectorImpl::build_internal(&format!("knx://127.0.0.1:{}", gateway_port), 8)
.await
.unwrap();
let task = tokio::spawn(connection_future);

let mut buf = [0u8; 1024];
let (len, client_addr) = timeout(RECV_TIMEOUT, gateway.recv_from(&mut buf))
.await
.expect("no CONNECT_REQUEST")
.unwrap();
assert_eq!(service_type_of(&buf[..len]), 0x0205);
gateway
.send_to(&connect_response(7, 0), client_addr)
.await
.unwrap();

// Outbound write; deliberately do NOT ACK the first request.
let mut data = heapless::Vec::new();
data.push(0x01).unwrap();
command_tx
.send(GroupWrite {
group_addr: "1/0/8".parse().unwrap(),
data,
})
.await
.unwrap();
let (len, _) = timeout(RECV_TIMEOUT, gateway.recv_from(&mut buf))
.await
.expect("no TUNNELING_REQUEST")
.unwrap();
let first = buf[..len].to_vec();
assert_eq!(service_type_of(&first), 0x0420);

// The retransmit arrives after the ACK timeout, byte-identical.
let (len, _) = timeout(Duration::from_secs(8), gateway.recv_from(&mut buf))
.await
.expect("no retransmit after dropped ACK")
.unwrap();
assert_eq!(&buf[..len], &first[..]);

// ACK the repeat: the tunnel stays up — an inbound telegram still
// round-trips on the same channel (a disconnect would have produced
// a CONNECT_REQUEST here instead of an ACK).
gateway
.send_to(&gateway_ack(7, 0), client_addr)
.await
.unwrap();
gateway
.send_to(&inbound_group_write(7, 42), client_addr)
.await
.unwrap();
let (len, _) = timeout(RECV_TIMEOUT, gateway.recv_from(&mut buf))
.await
.expect("no TUNNELING_ACK for inbound telegram")
.unwrap();
assert_eq!(service_type_of(&buf[..len]), 0x0421);
let (topic, _) = timeout(RECV_TIMEOUT, telegram_rx.recv())
.await
.expect("no telegram routed")
.unwrap();
assert_eq!(topic, "1/0/7");

task.abort();
}

/// Full roundtrip against a scripted fake gateway on localhost UDP:
/// handshake, inbound telegram → `KnxSource` channel, outbound command →
/// TUNNELING_REQUEST on the wire (then ACKed).
Expand Down
189 changes: 174 additions & 15 deletions aimdb-knx-connector/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ pub enum Action {
/// The engine has entered backoff and will emit the next CONNECT_REQUEST
/// as a `Send` action once the backoff deadline passes.
ResetSocket,
/// An outbound telegram was never acknowledged within the ACK timeout.
/// Log-only: the engine keeps the connection (matching both previous
/// implementations, which expired and warned without retransmitting).
/// An outbound telegram was never acknowledged: every send attempt
/// (1 + [`TunnelConfig::ack_retransmits`]) expired unanswered. Log-only
/// for the transport. With retransmits enabled the engine also tears the
/// connection down afterwards (KNXnet/IP 3.8.4); with
/// `ack_retransmits == 0` it keeps the connection, matching both
/// pre-retransmit implementations.
AckTimeout { seq: u8 },
}

Expand Down Expand Up @@ -124,10 +127,20 @@ pub struct TunnelConfig {
pub local_endpoint: LocalEndpoint,
/// CONNECT_RESPONSE wait before giving up and backing off.
pub connect_timeout_ms: Millis,
/// Pending ACK lifetime before it is reported via [`Action::AckTimeout`].
/// Pending ACK lifetime before a send attempt is considered unanswered.
pub ack_timeout_ms: Millis,
/// Cadence of the pending-ACK expiry sweep.
pub ack_sweep_ms: Millis,
/// TUNNELING_REQUEST retransmissions after an ACK timeout before the
/// telegram is reported lost via [`Action::AckTimeout`].
///
/// KNXnet/IP 3.8.4 repeats the identical frame once and tears the
/// connection down when the repeat also goes unanswered — `1` (the
/// default) is that behavior, with the retransmit delay being
/// [`ack_timeout_ms`](Self::ack_timeout_ms) (set that to `1_000` for
/// strict spec timing). `0` restores the pre-retransmit behavior: expire
/// and warn only, no disconnect, and no frame bytes are buffered.
pub ack_retransmits: u8,
/// CONNECTIONSTATE_REQUEST keepalive cadence.
pub heartbeat_ms: Millis,
/// CONNECTIONSTATE_RESPONSE wait before the connection is considered
Expand All @@ -147,24 +160,38 @@ impl Default for TunnelConfig {
connect_timeout_ms: 5_000,
ack_timeout_ms: 3_000,
ack_sweep_ms: 500,
ack_retransmits: 1,
heartbeat_ms: 55_000,
heartbeat_response_timeout_ms: 10_000,
reconnect_backoff_ms: 5_000,
}
}
}

/// Pending outbound ACKs tracked per connection (seq → sent-at).
/// Pending outbound ACKs tracked per connection (seq → [`PendingAck`]).
const PENDING_ACK_CAPACITY: usize = 16;

/// A tracked outbound TUNNELING_REQUEST awaiting its TUNNELING_ACK.
#[derive(Debug)]
struct PendingAck {
sent_at: Millis,
/// Send attempts still allowed after the next expiry.
retries_left: u8,
/// The sent frame, buffered for byte-identical retransmission (KNXnet/IP
/// 3.8.4 repeats the same frame, same sequence counter). Left empty when
/// `ack_retransmits == 0`: the slot capacity is statically reserved
/// either way (heapless), but no bytes are copied.
frame: Frame,
}

/// Per-connection state; only exists while the handshake has succeeded, so
/// "connected" needs no separate flag and the keepalive cannot fire while
/// disconnected.
#[derive(Debug)]
struct ChannelState {
channel_id: u8,
outbound_seq: u8,
pending_acks: heapless::FnvIndexMap<u8, Millis, PENDING_ACK_CAPACITY>,
pending_acks: heapless::FnvIndexMap<u8, PendingAck, PENDING_ACK_CAPACITY>,
next_heartbeat: Millis,
/// Set when a CONNECTIONSTATE_REQUEST goes out; cleared by the gateway's
/// OK response. A pending entry older than
Expand Down Expand Up @@ -335,15 +362,25 @@ impl TunnelEngine {
let seq = state.next_outbound_seq();
let cemi = build_group_write_cemi(cmd.group_addr, &cmd.data);
let frame = build_tunneling_request(state.channel_id, seq, &cemi);
if state.pending_acks.insert(seq, now).is_err() {
let pending = PendingAck {
sent_at: now,
retries_left: self.cfg.ack_retransmits,
frame: if self.cfg.ack_retransmits > 0 {
frame.clone()
} else {
Frame::new()
},
};
if let Err((_, pending)) = state.pending_acks.insert(seq, pending) {
// Map full (burst deeper than PENDING_ACK_CAPACITY): evict the
// oldest entry and report it now, so no unacknowledged telegram
// is ever silently untracked.
// is ever silently untracked. Eviction is overflow, not confirmed
// loss, so it never tears the connection down.
if let Some((&oldest, _)) = state.pending_acks.iter().next() {
state.pending_acks.remove(&oldest);
self.actions.push_back(Action::AckTimeout { seq: oldest });
}
let _ = state.pending_acks.insert(seq, now);
let _ = state.pending_acks.insert(seq, pending);
}
self.actions.push_back(Action::Send {
frame,
Expand Down Expand Up @@ -378,6 +415,9 @@ impl TunnelEngine {
/// Fire any deadlines that have passed. Call at the top of every loop
/// iteration, before draining actions.
pub fn poll(&mut self, now: Millis) {
// Set inside the `Connected` arm (where `self.phase` is borrowed) and
// applied after the match — same pattern as `handle_datagram`.
let mut drop_connection = false;
match &mut self.phase {
Phase::Backoff { until } => {
if now >= *until {
Expand Down Expand Up @@ -419,19 +459,44 @@ impl TunnelEngine {
}
if now >= state.next_ack_sweep {
let mut expired: heapless::Vec<u8, PENDING_ACK_CAPACITY> = heapless::Vec::new();
for (&seq, &sent_at) in state.pending_acks.iter() {
if now.saturating_sub(sent_at) > self.cfg.ack_timeout_ms {
for (&seq, pending) in state.pending_acks.iter() {
if now.saturating_sub(pending.sent_at) > self.cfg.ack_timeout_ms {
let _ = expired.push(seq);
}
}
for seq in &expired {
state.pending_acks.remove(seq);
self.actions.push_back(Action::AckTimeout { seq: *seq });
let Some(pending) = state.pending_acks.get_mut(seq) else {
continue;
};
if pending.retries_left > 0 {
// KNXnet/IP 3.8.4: repeat the identical frame
// (same sequence counter) and re-arm the timeout.
pending.retries_left -= 1;
pending.sent_at = now;
self.actions.push_back(Action::Send {
frame: pending.frame.clone(),
await_ack: Some(*seq),
});
} else {
state.pending_acks.remove(seq);
self.actions.push_back(Action::AckTimeout { seq: *seq });
// Spec: tear the connection down once the repeat
// also went unanswered, so queued commands stop
// being sent into a dead tunnel.
// `ack_retransmits == 0` keeps the legacy
// warn-and-continue behavior.
if self.cfg.ack_retransmits > 0 {
drop_connection = true;
}
}
}
state.next_ack_sweep = now + self.cfg.ack_sweep_ms;
}
}
}
if drop_connection {
self.handle_socket_error(now);
}
}

/// Earliest deadline the transport must wake the engine for.
Expand Down Expand Up @@ -711,16 +776,27 @@ fn parse_telegram(cemi_data: &[u8]) -> Option<(GroupAddress, Vec<u8>)> {
mod tests {
use super::*;

/// Legacy-mode config: no retransmits, expire-and-warn only. Most tests
/// pin this pre-retransmit contract; the retransmit tests use
/// [`RETRANSMIT_CFG`].
const CFG: TunnelConfig = TunnelConfig {
local_endpoint: LocalEndpoint::Nat,
connect_timeout_ms: 5_000,
ack_timeout_ms: 3_000,
ack_sweep_ms: 500,
ack_retransmits: 0,
heartbeat_ms: 55_000,
heartbeat_response_timeout_ms: 10_000,
reconnect_backoff_ms: 5_000,
};

/// [`CFG`] with the spec-conformant retransmit knob (the shipping
/// default) enabled.
const RETRANSMIT_CFG: TunnelConfig = TunnelConfig {
ack_retransmits: 1,
..CFG
};

fn drain(engine: &mut TunnelEngine) -> Vec<Action> {
let mut actions = Vec::new();
while let Some(a) = engine.next_action() {
Expand Down Expand Up @@ -757,7 +833,12 @@ mod tests {

/// Drive a fresh engine to Connected; returns it with actions drained.
fn connected_engine(now: Millis) -> TunnelEngine {
let mut engine = TunnelEngine::new(CFG.clone(), now);
connected_engine_with(CFG.clone(), now)
}

/// [`connected_engine`] with an explicit config.
fn connected_engine_with(cfg: TunnelConfig, now: Millis) -> TunnelEngine {
let mut engine = TunnelEngine::new(cfg, now);
engine.poll(now);
let actions = drain(&mut engine);
assert_eq!(actions.len(), 1);
Expand Down Expand Up @@ -1024,10 +1105,88 @@ mod tests {
engine.poll(1_000);
assert!(drain(&mut engine).is_empty());

// First sweep past sent_at + ack_timeout expires it.
// First sweep past sent_at + ack_timeout expires it. Legacy mode
// (ack_retransmits == 0): warn only, the connection is kept.
engine.poll(10 + CFG.ack_timeout_ms + CFG.ack_sweep_ms);
let actions = drain(&mut engine);
assert_eq!(actions, vec![Action::AckTimeout { seq: 0 }]);
assert!(engine.is_connected());
}

#[test]
fn ack_timeout_retransmits_identical_frame_then_disconnects() {
let addr: GroupAddress = "1/0/8".parse().unwrap();
let mut engine = connected_engine_with(RETRANSMIT_CFG.clone(), 0);
let mut data = heapless::Vec::new();
data.push(0x01).unwrap();
engine.handle_command(
GroupWrite {
group_addr: addr,
data,
},
10,
);
let actions = drain(&mut engine);
let [Action::Send {
frame: original,
await_ack: Some(0),
}] = &actions[..]
else {
panic!("expected tracked TUNNELING_REQUEST send, got {actions:?}");
};
let original = original.clone();

// First expiry: the identical frame goes out again (same sequence
// counter, KNXnet/IP 3.8.4), no AckTimeout, connection kept.
engine.poll(10 + CFG.ack_timeout_ms + CFG.ack_sweep_ms);
let actions = drain(&mut engine);
assert_eq!(
actions,
vec![Action::Send {
frame: original,
await_ack: Some(0),
}]
);
assert!(engine.is_connected());

// Second expiry: reported lost, and the connection is torn down so
// subsequent commands queue instead of vanishing into a dead tunnel.
let t = 10 + 2 * (CFG.ack_timeout_ms + CFG.ack_sweep_ms);
engine.poll(t);
let actions = drain(&mut engine);
assert_eq!(
actions,
vec![Action::AckTimeout { seq: 0 }, Action::ResetSocket]
);
assert!(!engine.is_connected());
assert_eq!(engine.next_deadline(), t + CFG.reconnect_backoff_ms);
}

#[test]
fn retransmitted_request_acked_keeps_connection() {
let addr: GroupAddress = "1/0/8".parse().unwrap();
let mut engine = connected_engine_with(RETRANSMIT_CFG.clone(), 0);
let mut data = heapless::Vec::new();
data.push(0x01).unwrap();
engine.handle_command(
GroupWrite {
group_addr: addr,
data,
},
10,
);
drain(&mut engine);

// Let the first send expire and the retransmit go out…
engine.poll(10 + CFG.ack_timeout_ms + CFG.ack_sweep_ms);
assert_eq!(drain(&mut engine).len(), 1);

// …then the (late) ACK lands: pending cleared, no AckTimeout at any
// later sweep, connection kept.
engine.handle_datagram(&gateway_ack(7, 0), 4_000);
engine.poll(20_000);
assert!(drain(&mut engine).is_empty());
assert!(engine.is_connected());
}

#[test]
Expand Down
Loading