From d2e2076db62dbb43987489fec98b069ef57fd7fb Mon Sep 17 00:00:00 2001 From: Saurabh Chauhan Date: Mon, 8 Jun 2026 12:42:58 +0530 Subject: [PATCH] feat(OBE-9898): always require a token; read site_version claim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `vector` source's auth had a `require_token` config that, when set to false, let token-less requests through for staged migration. That bypass applied to *every* sink — including upgraded sites that should never skip auth — so it was a blunt, unsafe instrument. This removes `require_token` entirely: a token is now always required (`authenticate` rejects a missing `authorization` header unconditionally), which closes the "upgraded sites skipping auth" gap. Instead of a global flag, the source now reads the per-token `site_version` claim that the manager stamps onto every site token (OBE-9896) and surfaces it for telemetry / future per-version policy. Older tokens that predate the claim are tolerated (the claim is optional). Notes: - `AuthConfig` uses `#[serde(deny_unknown_fields)]`, so this is a breaking change: any config TOML that *sets* `require_token` will now fail to parse. The manager never emitted that field, so generated configs are unaffected. - The `site_version` claim name is the cross-repo contract with the manager's `SiteVersionClaim`; both sides pin the literal "site_version" with a unit test so a rename can't silently break compatibility. Co-Authored-By: Claude Opus 4.8 --- src/sinks/vector/config.rs | 4 +- src/sources/util/jwt_auth.rs | 146 ++++++++++++++++++++--------------- src/sources/vector/mod.rs | 111 ++++---------------------- src/test_util/jwt_auth.rs | 1 - 4 files changed, 99 insertions(+), 163 deletions(-) diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index 447bce0ff..a71ed776f 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -189,8 +189,8 @@ async fn healthcheck( // Build the request manually so we can attach the same authorization // header that `Service::call` attaches to `push_events`. Without this, - // a source configured with `require_token = true` would refuse this - // healthcheck even though the sink has valid credentials. + // a source that requires a token would refuse this healthcheck even + // though the sink has valid credentials. let mut request = tonic::Request::new(proto::HealthCheckRequest {}); if let Some(auth) = service.auth() { let bearer = auth.bearer_token().map_err(|message| { diff --git a/src/sources/util/jwt_auth.rs b/src/sources/util/jwt_auth.rs index 786518f7f..faea61e40 100644 --- a/src/sources/util/jwt_auth.rs +++ b/src/sources/util/jwt_auth.rs @@ -33,6 +33,19 @@ pub(crate) const AUTH_FIELD_NAME_TAG: &str = "auth_field_name"; /// Metric tag key for the auth field value. pub(crate) const AUTH_FIELD_VALUE_TAG: &str = "auth_field_value"; +/// JWT claim carrying the site's version at token-issue time, stamped by the +/// manager's auth-service (OBE-9896). Read for telemetry / future per-version +/// policy; absent for older sites whose tokens predate the claim. Must stay in +/// lockstep with the manager's `SiteVersionClaim` (`common/utils/auth`). +const SITE_VERSION_CLAIM: &str = "site_version"; + +/// Extract the `site_version` claim from a validated token's claims, if present +/// and a string. Returns `None` for tokens minted before the claim existed (or +/// of an unexpected type) — the caller must tolerate its absence. +fn extract_site_version(claims: &Claims) -> Option<&str> { + claims.get(SITE_VERSION_CLAIM).and_then(Value::as_str) +} + /// Errors returned by [`Auth::authenticate`] (request-level). #[derive(Debug, PartialEq)] pub enum AuthError { @@ -881,23 +894,6 @@ pub struct AuthConfig { /// filtered out. When absent, no per-event filtering is applied. #[serde(default, skip_serializing_if = "Option::is_none")] pub value_path: Option, - - /// When `true`, requests without an `authorization` header are rejected - /// with `Unauthenticated`. Defaults to `true` (secure by default). - /// - /// Set to `false` to opt into the legacy fallback that accepts requests - /// without a token (useful during a staged migration where older agents - /// haven't been updated yet). - /// - /// Applies to both `push_events` and `health_check` RPCs so a sink with - /// auth misconfigured fails its healthcheck rather than silently - /// bypassing token validation. - #[serde(default = "default_require_token")] - pub require_token: bool, -} - -fn default_require_token() -> bool { - true } /// Replace serde's flattened-enum error with an actionable message naming the @@ -972,7 +968,6 @@ impl AuthConfig { jwks_validations, membership_claim, value_path, - require_token: self.require_token, }))) } } @@ -1003,7 +998,6 @@ struct Inner { jwks_validations: HashMap, membership_claim: Option, value_path: Option, - require_token: bool, } /// Per-request auth context returned by a successful [`Auth::authenticate`] call. @@ -1146,11 +1140,10 @@ impl Auth { /// /// # Returns /// - /// * `Ok(None)` — `authorization` was absent and `require_token` is `false`; request is - /// accepted without per-event filtering (legacy / migration mode). /// * `Ok(Some(ctx))` — token is valid. Use [`AuthContext::is_authorized`] for per-event /// membership checks against the extracted allowed-values list. - /// * `Err(AuthError::InvalidToken)` — `authorization` was absent but required, the token + /// * `Err(AuthError::InvalidToken)` — `authorization` was absent (a token is always + /// required — OBE-9898 removed the legacy `require_token = false` bypass), or the token /// is malformed/expired/bad-signature, wrong issuer/audience, unsupported algorithm, /// or the membership claim is missing. pub async fn authenticate( @@ -1158,13 +1151,12 @@ impl Auth { authorization: Option<&str>, ) -> Result, AuthError> { let Some(auth_value) = authorization else { - if self.0.require_token { - return Err(AuthError::InvalidToken( - "authorization header is required", - )); - } - debug!(message = "No authorization header; allowing request."); - return Ok(None); + // A token is always required (OBE-9898 removed the legacy + // `require_token = false` bypass that let token-less sinks through — + // it also let *upgraded* sites skip auth, which this closes). + return Err(AuthError::InvalidToken( + "authorization header is required", + )); }; let token = strip_bearer_prefix(auth_value) @@ -1219,6 +1211,13 @@ impl Auth { } }; + // Surface the site's version (OBE-9896 stamps `site_version` as a JWT + // claim) for telemetry / future per-version policy. Absent for older + // sites whose tokens predate the claim. + if let Some(site_version) = extract_site_version(&token_data.claims) { + debug!(message = "Authenticated site push.", site_version = %site_version); + } + let allowed_values = inner.membership_claim .as_ref() .map(|c| c.extract(&token_data.claims)) @@ -1272,8 +1271,8 @@ mod tests { }; // Construct a baseline `AuthConfig` from the given authority, using the - // permissive defaults the tests below want (no issuer/audience/value_path, - // `require_token = false`). Individual tests override fields as needed. + // permissive defaults the tests below want (no issuer/audience/value_path). + // Individual tests override fields as needed. fn cfg_with(authority: Authority) -> AuthConfig { AuthConfig { authority, @@ -1282,7 +1281,6 @@ mod tests { membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())), value_path: None, algorithms: None, - require_token: false, } } @@ -1398,15 +1396,6 @@ mod tests { // ── Auth::authenticate ─────────────────────────────────────────────────── - #[tokio::test] - async fn no_auth_header_allows_legacy_client_when_require_token_false() { - // The shared `build_auth` helper now matches production default - // (require_token = true), so explicitly opt out to test legacy mode. - let auth = build_auth_with_require_token(false).await; - let result = auth.authenticate(None).await; - assert!(matches!(result, Ok(None))); - } - #[tokio::test] async fn valid_token_returns_allowed_values() { let auth = build_auth(None, None).await; @@ -1659,41 +1648,74 @@ mod tests { assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok()); } - // ── require_token enforcement ──────────────────────────────────────────── - - async fn build_auth_with_require_token(require: bool) -> Auth { - let mut cfg = cfg_with(inline_public_key()); - cfg.require_token = require; - cfg.build().await.unwrap() - } + // ── token is always required (OBE-9898) ────────────────────────────────── #[tokio::test] - async fn require_token_false_allows_missing_authorization() { - let auth = build_auth_with_require_token(false).await; - assert!(matches!(auth.authenticate(None).await, Ok(None))); - } - - #[tokio::test] - async fn require_token_true_rejects_missing_authorization() { - let auth = build_auth_with_require_token(true).await; + async fn missing_authorization_is_rejected() { + // No `require_token = false` escape hatch any more: a missing header is + // always rejected, so upgraded sites can never skip auth. + let auth = build_auth(None, None).await; let result = auth.authenticate(None).await; assert!(matches!(result, Err(AuthError::InvalidToken(_)))); } #[tokio::test] - async fn require_token_true_accepts_valid_token() { - let auth = build_auth_with_require_token(true).await; + async fn valid_token_is_accepted() { + let auth = build_auth(None, None).await; let token = make_token(HashMap::new()); assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok()); } #[tokio::test] - async fn require_token_true_still_rejects_invalid_token() { - let auth = build_auth_with_require_token(true).await; + async fn invalid_token_is_rejected() { + let auth = build_auth(None, None).await; let result = auth.authenticate(Some("Bearer not.a.jwt")).await; assert!(matches!(result, Err(AuthError::InvalidToken(_)))); } + // ── site_version claim (OBE-9896 ↔ OBE-9898 contract) ───────────────────── + + #[test] + fn site_version_claim_name_is_stable() { + // The manager (OBE-9896) stamps this exact claim name onto every site + // token; a rename here silently breaks cross-repo compatibility. Keep in + // lockstep with the manager's `SiteVersionClaim` ("site_version"). + assert_eq!(SITE_VERSION_CLAIM, "site_version"); + } + + #[test] + fn extract_site_version_reads_the_claim() { + let mut claims = Claims::new(); + claims.insert(SITE_VERSION_CLAIM.to_string(), Value::from("2.42.0")); + assert_eq!(extract_site_version(&claims), Some("2.42.0")); + } + + #[test] + fn extract_site_version_absent_is_none() { + // Older sites whose tokens predate the claim must not error out. + let claims = Claims::new(); + assert_eq!(extract_site_version(&claims), None); + } + + #[test] + fn extract_site_version_wrong_type_is_none() { + // A non-string value is ignored rather than mis-parsed. + let mut claims = Claims::new(); + claims.insert(SITE_VERSION_CLAIM.to_string(), Value::from(42)); + assert_eq!(extract_site_version(&claims), None); + } + + #[tokio::test] + async fn authenticate_surfaces_site_version_when_present() { + // End-to-end: a real signed token carrying `site_version` validates and + // the claim is read off it (the OBE-9896 → OBE-9898 hand-off). + let auth = build_auth(None, None).await; + let mut extra = HashMap::new(); + extra.insert(SITE_VERSION_CLAIM, Value::from("2.42.0")); + let token = make_token(extra); + assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok()); + } + #[test] fn default_algorithms_covers_rs_and_ps_family() { let algos = default_algorithms(); @@ -1729,7 +1751,6 @@ mod tests { async fn build_no_membership_auth() -> Auth { let mut cfg = cfg_with(inline_public_key()); cfg.membership_claim = None; - cfg.require_token = true; cfg.build().await.unwrap() } @@ -2085,7 +2106,6 @@ pub_key.type = "inline" pub_key.value = "pem" membership_claim = "tenants" issuer = "https://issuer.example.com/" -require_token = false "#; let cfg: AuthConfig = toml::from_str(toml).unwrap(); assert!(matches!( @@ -2094,7 +2114,6 @@ require_token = false )); assert_eq!(cfg.membership_claim, Some(MembershipClaimConfig::Identity("tenants".to_string()))); assert_eq!(cfg.issuer.as_deref(), Some("https://issuer.example.com/")); - assert!(!cfg.require_token); } // ── MembershipClaimConfig serde ────────────────────────────────────────── @@ -2386,7 +2405,6 @@ pub_key.value = "pem" membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())), value_path: None, algorithms: None, - require_token: false, } } diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index b41222a54..e4b7456d9 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -114,8 +114,8 @@ impl AuthBatchStats { impl Service { /// Run request-level JWT validation against an inbound gRPC request. /// - /// Shared between `push_events` and `health_check` so both RPCs honor the - /// same `require_token` enforcement and reject the same set of bad tokens. + /// Shared between `push_events` and `health_check` so both RPCs require a + /// valid token and reject the same set of bad tokens. async fn validate_auth_header( &self, request: &Request, @@ -254,8 +254,8 @@ impl proto::Service for Service { &self, request: Request, ) -> Result, Status> { - // Apply the same JWT validation as push_events — same auth posture, - // including `require_token` enforcement when configured. + // Apply the same JWT validation as push_events — same auth posture, so + // a misconfigured/unauthenticated sink fails its healthcheck. self.validate_auth_header(&request).await?; let message = proto::HealthCheckResponse { @@ -973,24 +973,6 @@ value = "{token}""# ); } - #[tokio::test] - async fn legacy_sink_without_auth_is_accepted() { - // Source has auth configured with require_token=false (legacy mode); - // sink sends no token → request allowed through. - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = false"#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - assert_eq!( - run_auth_pair(&source_auth, "").await, - BatchStatus::Delivered - ); - } - #[tokio::test] async fn invalid_token_is_rejected() { let source_auth = format!( @@ -1044,27 +1026,7 @@ value = "{token}""# ); } - #[tokio::test] - async fn legacy_sink_with_value_path_configured_is_accepted() { - // Source has value_path with require_token=false (legacy mode); sink sends - // no token → per-event filtering is skipped entirely, all events pass through. - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = false -[auth.value_path] -default = "tenant_id""#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - assert_eq!( - run_auth_pair(&source_auth, "").await, - BatchStatus::Delivered - ); - } - - // ── require_token + healthcheck integration ────────────────────────── + // ── healthcheck integration ────────────────────────────────────────── /// Build a source+sink pair and return the result of the sink's healthcheck. async fn run_healthcheck_pair( @@ -1093,50 +1055,9 @@ default = "tenant_id""#, } #[tokio::test] - async fn require_token_source_rejects_unauthenticated_push() { - // Source: require_token = true (explicit); sink: no auth → rejected. - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - assert_eq!( - run_auth_pair(&source_auth, "").await, - BatchStatus::Rejected - ); - } - - #[tokio::test] - async fn require_token_source_accepts_authenticated_push() { - // Source: require_token = true (explicit); sink: valid token → delivered. - let token = make_token(HashMap::new()); - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - let sink_auth = format!( - r#"[auth] -[auth.token] -type = "inline" -value = "{token}""# - ); - assert_eq!( - run_auth_pair(&source_auth, &sink_auth).await, - BatchStatus::Delivered - ); - } - - #[tokio::test] - async fn default_require_token_rejects_request_without_token() { - // Source TOML omits `require_token` — the default is `true`, - // so a sink with no token must be rejected. + async fn source_rejects_request_without_token() { + // Auth is configured; a token is always required, so a sink that + // sends none is rejected (no `require_token = false` escape hatch). let source_auth = format!( r#"[auth] pub_key.type = "inline" @@ -1151,9 +1072,9 @@ membership_claim = "site_ids""#, } #[tokio::test] - async fn default_require_token_accepts_request_with_token() { - // Source TOML omits `require_token` — default `true`. Sink sends - // a valid token; request flows through. + async fn source_accepts_request_with_token() { + // Auth is configured; the sink sends a valid token, so the request + // flows through. let token = make_token(HashMap::new()); let source_auth = format!( r#"[auth] @@ -1175,14 +1096,13 @@ value = "{token}""# } #[tokio::test] - async fn healthcheck_succeeds_when_sink_sends_token_to_require_token_source() { + async fn healthcheck_succeeds_when_sink_sends_token() { let token = make_token(HashMap::new()); let source_auth = format!( r#"[auth] pub_key.type = "inline" pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, +membership_claim = "site_ids""#, TEST_PUBLIC_KEY.replace('\n', "\\n") ); let sink_auth = format!( @@ -1195,13 +1115,12 @@ value = "{token}""# } #[tokio::test] - async fn healthcheck_fails_when_sink_omits_token_to_require_token_source() { + async fn healthcheck_fails_when_sink_omits_token() { let source_auth = format!( r#"[auth] pub_key.type = "inline" pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, +membership_claim = "site_ids""#, TEST_PUBLIC_KEY.replace('\n', "\\n") ); assert!(run_healthcheck_pair(&source_auth, "").await.is_err()); diff --git a/src/test_util/jwt_auth.rs b/src/test_util/jwt_auth.rs index bdf7bd864..ffbd5ec88 100644 --- a/src/test_util/jwt_auth.rs +++ b/src/test_util/jwt_auth.rs @@ -112,7 +112,6 @@ pub async fn build_auth(issuer: Option<&str>, audience: Option>) -> Au membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())), value_path: None, algorithms: None, - require_token: true, } .build() .await