From 84a84c8e86cbb19577b0d564337ab8c5449345aa Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 14:33:10 +0200 Subject: [PATCH 1/2] error early when not authenticated --- Cargo.lock | 1 + livekit/Cargo.toml | 1 + livekit/specs/signalling-reconnection.allium | 43 ++++++---- livekit/src/rtc_engine/mod.rs | 90 ++++++++++++++++++++ 4 files changed, 119 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b85400203..5a0cffb65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3999,6 +3999,7 @@ dependencies = [ "bytes", "chrono", "futures-util", + "http 1.4.0", "lazy_static", "libloading 0.8.9", "libwebrtc", diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 36fe1af9b..fa222762d 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -53,3 +53,4 @@ anyhow = "1.0.99" test-log = "0.2.18" test-case = "3.3" serial_test = "3.0" +http = "1.1" diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 809557c24..8f8d8e887 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -86,6 +86,7 @@ enum DisconnectCause { | server_leave | signal_severed_during_resume | reconnect_attempts_exhausted + | unauthorized | client_initiated | unknown } @@ -183,7 +184,10 @@ entity EngineConnection { ------------------------------------------------------------ config { - -- Initial connect: extra attempts after the first (Room default). + -- Initial connect: extra attempts after the first (Room default). A + -- validated auth failure (HTTP 401/403, DisconnectCause unauthorized) is + -- surfaced immediately and does NOT consume these retries (R4.2), matching + -- the reconnect loop's terminal-failure handling. join_retries: Integer = 3 -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). connect_timeout: Duration = 5.seconds @@ -489,7 +493,7 @@ rule ResumeRechecksLink { if not engine.signal.is_alive: ensures: ResumeAttemptFailed( engine, - server_disconnect: false, + terminal: false, cause: signal_severed_during_resume ) else: @@ -519,28 +523,32 @@ rule ResumeAttemptSucceeds { ensures: EngineResumed(engine) } -rule ResumeAttemptDisconnects { - when: ResumeAttemptFailed(engine, server_disconnect, cause) +rule ResumeAttemptFailsTerminally { + when: ResumeAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: server_disconnect + requires: terminal ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: cause) @guidance - -- A resume that failed because the server sent Leave{Disconnect}: stop, - -- do not escalate or retry. + -- A resume that failed in a NON-RETRYABLE way: stop, do not escalate or + -- retry. Terminal failures are (a) a server Leave{Disconnect} + -- (cause = server_leave) and (b) DELTA R4.2: an authentication failure + -- (cause = unauthorized, a server-validated HTTP 401/403) — the same + -- token will not succeed on retry, so retrying would just hammer the + -- server. `cause` carries which one for the Room. } rule ResumeAttemptEscalates { - when: ResumeAttemptFailed(engine, server_disconnect, cause) + when: ResumeAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: not server_disconnect + requires: not terminal ensures: engine.mode = full ensures: EngineRestarting(engine) ensures: RetryReconnect(engine) @guidance - -- DELTA 2: a failed resume escalates to full reconnect AND emits - -- Restarting on the escalation (previously silent), then retries. + -- DELTA 2: a (retryable) failed resume escalates to full reconnect AND + -- emits Restarting on the escalation (previously silent), then retries. } rule RestartAttemptSucceeds { @@ -555,19 +563,22 @@ rule RestartAttemptSucceeds { -- the old only on success, so a failed attempt leaves the old usable. } -rule RestartAttemptDisconnects { - when: RestartAttemptFailed(engine, server_disconnect, cause) +rule RestartAttemptFailsTerminally { + when: RestartAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: server_disconnect + requires: terminal ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: cause) + @guidance + -- Non-retryable full-reconnect failure: a server Leave{Disconnect} or + -- (DELTA R4.2) a validated auth failure (cause = unauthorized). Stop. } rule RestartAttemptRetries { - when: RestartAttemptFailed(engine, server_disconnect, cause) + when: RestartAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: not server_disconnect + requires: not terminal ensures: RetryReconnect(engine) } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index ce193c1fa..449248973 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -495,6 +495,14 @@ impl EngineInner { match try_connect().await { Ok(res) => return Ok(res), Err(e) => { + // A validated auth failure (401/403) will not succeed on + // retry with the same token — surface it immediately instead + // of burning the remaining join attempts. Same classification + // as the reconnect loop (see `auth_failure_reason`). + if auth_failure_reason(&e).is_some() { + log::warn!("authentication rejected during connect ({e}); not retrying"); + return Err(e); + } let attempt_i = i + 1; if i < max_retries { log::warn!( @@ -911,6 +919,14 @@ impl EngineInner { "server requested disconnect during restart".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!("authentication rejected during restart ({err}); not retrying"); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("restarting connection failed: {}", err); } } @@ -939,6 +955,14 @@ impl EngineInner { "server requested disconnect during resume".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!("authentication rejected during resume ({err}); not retrying"); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("resuming connection failed: {}", err); let mut running_handle = self.running_handle.write(); running_handle.full_reconnect = true; @@ -1091,6 +1115,28 @@ fn leave_disconnect_reason(err: &EngineError) -> Option { None } +/// Inspect a reconnect-attempt error for a genuine authentication/authorization +/// failure (HTTP 401/403). Such a failure will not succeed on retry with the +/// same token, so the reconnect loop should bail out immediately rather than +/// burning every attempt (and hammering the server) with credentials it already +/// knows are rejected. +/// +/// We key on `SignalError::Client(401|403)`, which is produced by the server's +/// `rtc/validate` probe (see [`super`]'s `SignalInner::validate`) — an +/// authoritative classification. We deliberately do NOT key on the raw +/// `WsError::Http` upgrade status, because that can be a fabricated 401 masking a +/// transient server error (e.g. a 503 from a saturated node), which IS +/// retryable. A resume that hits a raw 401 simply escalates to a full reconnect, +/// whose connect path runs `validate()` and surfaces the authoritative status. +fn auth_failure_reason(err: &EngineError) -> Option { + if let EngineError::Signal(SignalError::Client(status, _)) = err { + if matches!(status.as_u16(), 401 | 403) { + return Some(DisconnectReason::JoinFailure); + } + } + None +} + #[cfg(test)] mod tests { use super::*; @@ -1139,6 +1185,50 @@ mod tests { } } + #[test] + fn auth_failure_reason_flags_validated_401_and_403() { + // The server's rtc/validate probe surfaces auth failures as Client(4xx). + for status in [401u16, 403] { + let err = EngineError::Signal(SignalError::Client( + http::StatusCode::from_u16(status).unwrap(), + "invalid token".into(), + )); + assert_eq!( + auth_failure_reason(&err), + Some(DisconnectReason::JoinFailure), + "Client({status}) must be treated as a non-retryable auth failure" + ); + } + } + + #[test] + fn auth_failure_reason_ignores_other_client_and_server_errors() { + let not_auth = [ + // Other client errors are not auth failures. + EngineError::Signal(SignalError::Client(http::StatusCode::NOT_FOUND, "".into())), + EngineError::Signal(SignalError::Client( + http::StatusCode::TOO_MANY_REQUESTS, + "".into(), + )), + // Server errors (e.g. a saturated node) are retryable. + EngineError::Signal(SignalError::Server( + http::StatusCode::SERVICE_UNAVAILABLE, + "".into(), + )), + // Generic connectivity/internal errors are retryable. + EngineError::Connection("network".into()), + EngineError::Internal("bug".into()), + EngineError::Signal(SignalError::SendError), + EngineError::Signal(SignalError::Timeout("waiting".into())), + ]; + for err in ¬_auth { + assert!( + auth_failure_reason(err).is_none(), + "{err:?} must NOT be treated as an auth failure" + ); + } + } + #[test] fn backoff_nominal_grows_geometrically_then_caps() { // attempt 1 == base, then x2 each step, until it saturates at the cap. From af502d3e5ccaf5a895c7497cc40a9a7b758bfcda Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 15:04:46 +0200 Subject: [PATCH 2/2] sync signalling-reconnection spec to latest --- livekit/specs/signalling-reconnection.allium | 63 ++++++++++++++++---- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 8f8d8e887..9ff110582 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -22,6 +22,25 @@ -- Boundaries: peer-connection / media recovery (RtcSession) and the Room are -- external; modelled via the MediaLayer / Application surfaces below. -- +-- Structure (two layers, kept in one spec — they share the SignalConnection +-- entity, the connect/region/validate path, the auth classification and +-- DisconnectCause, so splitting would force cross-spec entity sharing for little +-- gain; the modular seams are expressed as boundary surfaces instead): +-- +-- LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- entity SignalConnection; rules under "Layer 1" below: +-- * Establishment — initial connect, region fallback, auth rejection. +-- (The engine-side join-retries loop is config-level, see join_retries; +-- its only modelled outcome is auth fast-fail, noted there.) +-- * Liveness & lifecycle — ping/timeout, resume, send/queue, token, close. +-- boundary surfaces: SignalCommands (app), ServerSignalling (SFU). +-- +-- LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- entity EngineConnection; rules under "Layer 2" below: initial connection, +-- reconnection causes, start/escalate, per-attempt dispatch, the decoupled +-- resume chain, and attempt outcomes (incl. terminal/non-retryable failures). +-- boundary surfaces: MediaRecovery (RtcSession), RoomConnectionLifecycle (Room). +-- -- Checker note (allium 3.2.4): `allium check` reports residual `status` -- warnings that are CHECKER ARTIFACTS, not spec defects -- every state flagged -- "never assigned" is plainly assigned by a visible `ensures`. Three confirmed @@ -85,7 +104,6 @@ enum DisconnectCause { | peer_connection_failed | server_leave | signal_severed_during_resume - | reconnect_attempts_exhausted | unauthorized | client_initiated | unknown @@ -122,9 +140,11 @@ entity SignalConnection { -- The engine that drives this link's recovery. engine: EngineConnection with signal = this - -- The link is dead if nothing arrived within ping_timeout. Used by the - -- resume's mid-flight liveness re-check (ResumeRechecksLink). - is_alive: last_message_at != null and last_message_at + ping_timeout > now + -- The link is present and usable while connected or mid-resume; `lost` or + -- `closed` mean the stream is gone. Used by the resume's mid-flight re-check + -- (ResumeRechecksLink) and mirrors the implementation's `is_connected()` + -- (stream-present) check rather than ping-timeout liveness. + is_connected: status in {connected, reconnecting} transitions status { connected -> lost -- ping timeout / stream closed @@ -158,6 +178,10 @@ entity EngineConnection { -- (A `permitted -> revoked` terminal transition graph would express this -- structurally, but the entity's `status` graph below takes the single graph -- slot the current checker honours; see the spec header note.) + -- NOTE (spec<->code): the implementation realises this as a `can_reconnect` + -- bool that is only ever set false (never back to true) within a session, + -- always alongside a close() — i.e. it already satisfies this latch and the + -- RevokedImpliesClosed invariant; the enum is the spec-level model. reconnect_permission: permitted | revoked -- 1-based attempt index within the current episode; 0 when connected. @@ -209,6 +233,10 @@ config { -- Rules ------------------------------------------------------------ +-- ########################################################################### +-- ## LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- ########################################################################### + -- === Establishment (DELTA 5: region fallback) =============================== rule SignalConnectionEstablished { @@ -230,6 +258,9 @@ rule PrimaryConnectFailsOnCloud { ensures: RegionFetchRequested(error) @guidance -- DELTA 5: only LiveKit Cloud hosts attempt region fallback. + -- DELTA R3.2: the region list is cached per host with a TTL (default + -- 5s), so a RegionFetchRequested within the TTL is served from cache + -- instead of re-paying the network fetch on every reconnect attempt. } rule PrimaryConnectFailsOnDirect { @@ -341,6 +372,10 @@ rule CloseSignalLink { ensures: connection.status = closed } +-- ########################################################################### +-- ## LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- ########################################################################### + -- === Engine: initial connection ============================================= rule EngineConnects { @@ -438,6 +473,10 @@ rule EscalateReconnect { -- event matching the mode it is in (previously the Room saw Resuming then -- Restarted with no Restarting between). retry_now restarts the backoff so -- the in-flight loop's next attempt fires immediately. + -- NOTE (spec<->code): the implementation emits Restarting lazily, on the + -- next attempt the loop runs in full mode (a `restarting_emitted` latch), + -- rather than eagerly at this escalation event. Net effect is identical + -- (Restarting always precedes the full reconnect). } -- === Engine: per-attempt dispatch =========================================== @@ -490,7 +529,7 @@ rule ResumeAwaitsPeerConnections { rule ResumeRechecksLink { when: PeerConnectionsReconnected(engine) requires: engine.status = reconnecting - if not engine.signal.is_alive: + if not engine.signal.is_connected: ensures: ResumeAttemptFailed( engine, terminal: false, @@ -598,12 +637,16 @@ rule ReconnectExhausted { requires: engine.status = reconnecting requires: engine.attempt >= config.max_reconnect_attempts ensures: engine.status = closed - ensures: EngineDisconnected(engine, cause: reconnect_attempts_exhausted) + ensures: EngineDisconnected(engine, cause: engine.reconnect_cause) @guidance - -- DELTA 2: on giving up, EngineDisconnected reports - -- reconnect_attempts_exhausted; the engine's reconnect_cause also remains - -- available to the Room for the underlying reason, instead of `unknown`. - -- Always emitted so the Room leaves the Reconnecting state. + -- DELTA 2: on giving up, EngineDisconnected reports the cause that STARTED + -- this episode (engine.reconnect_cause), threaded through instead of a + -- generic `unknown`. It is always emitted so the Room leaves Reconnecting + -- rather than hanging there. + -- NOTE (spec<->code): there is no dedicated "attempts exhausted" disconnect + -- reason in the wire protocol (proto DisconnectReason), so the original + -- cause is what surfaces. A distinct exhausted reason would require a + -- protocol addition. } ------------------------------------------------------------