Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sinks/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ async fn healthcheck(

// Build the request manually so we can attach the same authorization
// header that `Service::call` attaches to `push_events`. Without this,
// a source configured with `require_token = true` would refuse this
// healthcheck even though the sink has valid credentials.
// a source that requires a token would refuse this healthcheck even
// though the sink has valid credentials.
let mut request = tonic::Request::new(proto::HealthCheckRequest {});
if let Some(auth) = service.auth() {
let bearer = auth.bearer_token().map_err(|message| {
Expand Down
146 changes: 82 additions & 64 deletions src/sources/util/jwt_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ pub(crate) const AUTH_FIELD_NAME_TAG: &str = "auth_field_name";
/// Metric tag key for the auth field value.
pub(crate) const AUTH_FIELD_VALUE_TAG: &str = "auth_field_value";

/// JWT claim carrying the site's version at token-issue time, stamped by the
/// manager's auth-service (OBE-9896). Read for telemetry / future per-version
/// policy; absent for older sites whose tokens predate the claim. Must stay in
/// lockstep with the manager's `SiteVersionClaim` (`common/utils/auth`).
const SITE_VERSION_CLAIM: &str = "site_version";

/// Extract the `site_version` claim from a validated token's claims, if present
/// and a string. Returns `None` for tokens minted before the claim existed (or
/// of an unexpected type) — the caller must tolerate its absence.
fn extract_site_version(claims: &Claims) -> Option<&str> {
claims.get(SITE_VERSION_CLAIM).and_then(Value::as_str)
}

/// Errors returned by [`Auth::authenticate`] (request-level).
#[derive(Debug, PartialEq)]
pub enum AuthError {
Expand Down Expand Up @@ -881,23 +894,6 @@ pub struct AuthConfig {
/// filtered out. When absent, no per-event filtering is applied.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value_path: Option<AuthValuePath>,

/// When `true`, requests without an `authorization` header are rejected
/// with `Unauthenticated`. Defaults to `true` (secure by default).
///
/// Set to `false` to opt into the legacy fallback that accepts requests
/// without a token (useful during a staged migration where older agents
/// haven't been updated yet).
///
/// Applies to both `push_events` and `health_check` RPCs so a sink with
/// auth misconfigured fails its healthcheck rather than silently
/// bypassing token validation.
#[serde(default = "default_require_token")]
pub require_token: bool,
}

fn default_require_token() -> bool {
true
}

/// Replace serde's flattened-enum error with an actionable message naming the
Expand Down Expand Up @@ -972,7 +968,6 @@ impl AuthConfig {
jwks_validations,
membership_claim,
value_path,
require_token: self.require_token,
})))
}
}
Expand Down Expand Up @@ -1003,7 +998,6 @@ struct Inner {
jwks_validations: HashMap<Algorithm, Validation>,
membership_claim: Option<MembershipClaim>,
value_path: Option<CompiledValuePath>,
require_token: bool,
}

/// Per-request auth context returned by a successful [`Auth::authenticate`] call.
Expand Down Expand Up @@ -1146,25 +1140,23 @@ impl Auth {
///
/// # Returns
///
/// * `Ok(None)` — `authorization` was absent and `require_token` is `false`; request is
/// accepted without per-event filtering (legacy / migration mode).
/// * `Ok(Some(ctx))` — token is valid. Use [`AuthContext::is_authorized`] for per-event
/// membership checks against the extracted allowed-values list.
/// * `Err(AuthError::InvalidToken)` — `authorization` was absent but required, the token
/// * `Err(AuthError::InvalidToken)` — `authorization` was absent (a token is always
/// required — OBE-9898 removed the legacy `require_token = false` bypass), or the token
/// is malformed/expired/bad-signature, wrong issuer/audience, unsupported algorithm,
/// or the membership claim is missing.
pub async fn authenticate(
&self,
authorization: Option<&str>,
) -> Result<Option<AuthContext>, AuthError> {
let Some(auth_value) = authorization else {
if self.0.require_token {
return Err(AuthError::InvalidToken(
"authorization header is required",
));
}
debug!(message = "No authorization header; allowing request.");
return Ok(None);
// A token is always required (OBE-9898 removed the legacy
// `require_token = false` bypass that let token-less sinks through —
// it also let *upgraded* sites skip auth, which this closes).
return Err(AuthError::InvalidToken(
"authorization header is required",
));
};

let token = strip_bearer_prefix(auth_value)
Expand Down Expand Up @@ -1219,6 +1211,13 @@ impl Auth {
}
};

// Surface the site's version (OBE-9896 stamps `site_version` as a JWT
// claim) for telemetry / future per-version policy. Absent for older
// sites whose tokens predate the claim.
if let Some(site_version) = extract_site_version(&token_data.claims) {
debug!(message = "Authenticated site push.", site_version = %site_version);
}

let allowed_values = inner.membership_claim
.as_ref()
.map(|c| c.extract(&token_data.claims))
Expand Down Expand Up @@ -1272,8 +1271,8 @@ mod tests {
};

// Construct a baseline `AuthConfig` from the given authority, using the
// permissive defaults the tests below want (no issuer/audience/value_path,
// `require_token = false`). Individual tests override fields as needed.
// permissive defaults the tests below want (no issuer/audience/value_path).
// Individual tests override fields as needed.
fn cfg_with(authority: Authority) -> AuthConfig {
AuthConfig {
authority,
Expand All @@ -1282,7 +1281,6 @@ mod tests {
membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())),
value_path: None,
algorithms: None,
require_token: false,
}
}

Expand Down Expand Up @@ -1398,15 +1396,6 @@ mod tests {

// ── Auth::authenticate ───────────────────────────────────────────────────

#[tokio::test]
async fn no_auth_header_allows_legacy_client_when_require_token_false() {
// The shared `build_auth` helper now matches production default
// (require_token = true), so explicitly opt out to test legacy mode.
let auth = build_auth_with_require_token(false).await;
let result = auth.authenticate(None).await;
assert!(matches!(result, Ok(None)));
}

#[tokio::test]
async fn valid_token_returns_allowed_values() {
let auth = build_auth(None, None).await;
Expand Down Expand Up @@ -1659,41 +1648,74 @@ mod tests {
assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok());
}

// ── require_token enforcement ────────────────────────────────────────────

async fn build_auth_with_require_token(require: bool) -> Auth {
let mut cfg = cfg_with(inline_public_key());
cfg.require_token = require;
cfg.build().await.unwrap()
}
// ── token is always required (OBE-9898) ──────────────────────────────────

#[tokio::test]
async fn require_token_false_allows_missing_authorization() {
let auth = build_auth_with_require_token(false).await;
assert!(matches!(auth.authenticate(None).await, Ok(None)));
}

#[tokio::test]
async fn require_token_true_rejects_missing_authorization() {
let auth = build_auth_with_require_token(true).await;
async fn missing_authorization_is_rejected() {
// No `require_token = false` escape hatch any more: a missing header is
// always rejected, so upgraded sites can never skip auth.
let auth = build_auth(None, None).await;
let result = auth.authenticate(None).await;
assert!(matches!(result, Err(AuthError::InvalidToken(_))));
}

#[tokio::test]
async fn require_token_true_accepts_valid_token() {
let auth = build_auth_with_require_token(true).await;
async fn valid_token_is_accepted() {
let auth = build_auth(None, None).await;
let token = make_token(HashMap::new());
assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok());
}

#[tokio::test]
async fn require_token_true_still_rejects_invalid_token() {
let auth = build_auth_with_require_token(true).await;
async fn invalid_token_is_rejected() {
let auth = build_auth(None, None).await;
let result = auth.authenticate(Some("Bearer not.a.jwt")).await;
assert!(matches!(result, Err(AuthError::InvalidToken(_))));
}

// ── site_version claim (OBE-9896 ↔ OBE-9898 contract) ─────────────────────

#[test]
fn site_version_claim_name_is_stable() {
// The manager (OBE-9896) stamps this exact claim name onto every site
// token; a rename here silently breaks cross-repo compatibility. Keep in
// lockstep with the manager's `SiteVersionClaim` ("site_version").
assert_eq!(SITE_VERSION_CLAIM, "site_version");
}

#[test]
fn extract_site_version_reads_the_claim() {
let mut claims = Claims::new();
claims.insert(SITE_VERSION_CLAIM.to_string(), Value::from("2.42.0"));
assert_eq!(extract_site_version(&claims), Some("2.42.0"));
}

#[test]
fn extract_site_version_absent_is_none() {
// Older sites whose tokens predate the claim must not error out.
let claims = Claims::new();
assert_eq!(extract_site_version(&claims), None);
}

#[test]
fn extract_site_version_wrong_type_is_none() {
// A non-string value is ignored rather than mis-parsed.
let mut claims = Claims::new();
claims.insert(SITE_VERSION_CLAIM.to_string(), Value::from(42));
assert_eq!(extract_site_version(&claims), None);
}

#[tokio::test]
async fn authenticate_surfaces_site_version_when_present() {
// End-to-end: a real signed token carrying `site_version` validates and
// the claim is read off it (the OBE-9896 → OBE-9898 hand-off).
let auth = build_auth(None, None).await;
let mut extra = HashMap::new();
extra.insert(SITE_VERSION_CLAIM, Value::from("2.42.0"));
let token = make_token(extra);
assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok());
}

#[test]
fn default_algorithms_covers_rs_and_ps_family() {
let algos = default_algorithms();
Expand Down Expand Up @@ -1729,7 +1751,6 @@ mod tests {
async fn build_no_membership_auth() -> Auth {
let mut cfg = cfg_with(inline_public_key());
cfg.membership_claim = None;
cfg.require_token = true;
cfg.build().await.unwrap()
}

Expand Down Expand Up @@ -2085,7 +2106,6 @@ pub_key.type = "inline"
pub_key.value = "pem"
membership_claim = "tenants"
issuer = "https://issuer.example.com/"
require_token = false
"#;
let cfg: AuthConfig = toml::from_str(toml).unwrap();
assert!(matches!(
Expand All @@ -2094,7 +2114,6 @@ require_token = false
));
assert_eq!(cfg.membership_claim, Some(MembershipClaimConfig::Identity("tenants".to_string())));
assert_eq!(cfg.issuer.as_deref(), Some("https://issuer.example.com/"));
assert!(!cfg.require_token);
}

// ── MembershipClaimConfig serde ──────────────────────────────────────────
Expand Down Expand Up @@ -2386,7 +2405,6 @@ pub_key.value = "pem"
membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())),
value_path: None,
algorithms: None,
require_token: false,
}
}

Expand Down
Loading