Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions faststream_outbox/subscriber/config.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,51 @@
import sys
import typing
import warnings
from dataclasses import dataclass
from pathlib import Path

import faststream
from faststream._internal.configs import SubscriberSpecificationConfig, SubscriberUsecaseConfig
from faststream._internal.constants import EMPTY
from faststream.middlewares import AckPolicy

from faststream_outbox.retry import NoRetry


if typing.TYPE_CHECKING:
from faststream_outbox.configs import OutboxBrokerConfig
from faststream_outbox.retry import RetryStrategyProto


# Frames to walk past when attributing a subscriber-config warning to the user's call site:
# this package, faststream, and the dataclass-generated ``__init__`` (its ``co_filename`` is
# the literal ``"<string>"``) that calls ``__post_init__``.
_PKG_DIR = str(Path(__file__).parent.parent) # the faststream_outbox package dir
_FASTSTREAM_DIR = str(Path(faststream.__file__).parent) # the faststream package dir


def _is_internal_frame(filename: str) -> bool:
return filename == "<string>" or filename.startswith((_PKG_DIR, _FASTSTREAM_DIR))


def _subscriber_warn(message: str) -> None:
"""
Attribute a subscriber-config ``UserWarning`` to the user's ``@subscriber`` call (P27).

Computes ``stacklevel`` by walking out to the first non-internal frame instead of using
``warnings.warn(skip_file_prefixes=...)``: the 3.13 C ``warn`` does not skip the
``"<string>"`` dataclass-``__init__`` frame between ``__post_init__`` and the user (it
works on 3.14), and a static ``stacklevel`` can't be right for both the direct and the
FastAPI-router paths (they differ in frame count).
"""
stacklevel = 2 # frame above this helper's caller (``_validate``)
frame: typing.Any = sys._getframe(1) # noqa: SLF001 # the caller, ``_validate``
while frame is not None and _is_internal_frame(frame.f_code.co_filename):
frame = frame.f_back
stacklevel += 1
warnings.warn(message, UserWarning, stacklevel=stacklevel)


@dataclass(kw_only=True)
class OutboxSubscriberConfig(SubscriberUsecaseConfig):
_outer_config: "OutboxBrokerConfig"
Expand All @@ -24,6 +59,92 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig):
max_deliveries: int | None
propagate_inbound_headers: bool

def __post_init__(self) -> None:
# Validation lives here (not in the factory) so *every* construction path —
# ``@broker.subscriber`` / ``@router.subscriber`` / direct construction — is
# validated; there is no way to build a misconfigured config that skips the guards.
# Upstream-divergence watch: ``SubscriberUsecaseConfig`` has no ``__post_init__``
# today, but dataclasses call only the most-derived one — if faststream adds init
# logic there, this guarded call keeps it running instead of silently shadowing it.
parent_post_init = getattr(super(), "__post_init__", None)
if parent_post_init is not None: # pragma: no cover # defensive: base has none today
parent_post_init()
self._validate()

def _validate(self) -> None: # noqa: C901 # flat sequence of independent knob checks
"""
Reject impossible knob values, warn on combos that silently misbehave.

Errors are raised here (not deferred to runtime) so the user gets a traceback
pointing at the ``@broker.subscriber(...)`` decorator. Warnings use
``skip_file_prefixes`` (see ``_WARN_SKIP_PREFIXES``) so they are attributed to the
user's call site on both the direct and FastAPI-router paths (P27).
"""
# EMPTY means "ack_policy not passed" — map it back to None so the checks below
# match on the *explicitly-passed* policy exactly as the factory-side validation
# did (e.g. the NACK+NoRetry warning fires only on an explicit NACK_ON_ERROR, not
# on the default that resolves to NACK_ON_ERROR via the ``ack_policy`` property).
ack_policy = None if self._ack_policy is EMPTY else self._ack_policy
if self.max_workers <= 0:
msg = f"max_workers must be >= 1, got {self.max_workers}"
raise ValueError(msg)
if self.fetch_batch_size <= 0:
msg = f"fetch_batch_size must be >= 1, got {self.fetch_batch_size}"
raise ValueError(msg)
# P12: non-positive intervals/TTL turn the adaptive backoff into a busy-poll (or an
# instantly-expiring lease). Reject up front rather than spin a hot loop at runtime.
if self.min_fetch_interval <= 0:
msg = f"min_fetch_interval must be > 0, got {self.min_fetch_interval}"
raise ValueError(msg)
if self.max_fetch_interval <= 0:
msg = f"max_fetch_interval must be > 0, got {self.max_fetch_interval}"
raise ValueError(msg)
if self.lease_ttl_seconds <= 0:
msg = f"lease_ttl_seconds must be > 0, got {self.lease_ttl_seconds}"
raise ValueError(msg)
if self.min_fetch_interval > self.max_fetch_interval:
msg = (
f"min_fetch_interval ({self.min_fetch_interval}) must be <= max_fetch_interval "
f"({self.max_fetch_interval}); the adaptive idle backoff grows from ~min_fetch_interval "
f"(the base interval, with ±50% jitter) up to max_fetch_interval (the ceiling)."
)
raise ValueError(msg)
is_no_retry = isinstance(self.retry_strategy, NoRetry)
if ack_policy is AckPolicy.ACK_FIRST:
msg = (
"ack_policy=AckPolicy.ACK_FIRST is not supported by the outbox broker: it "
"deletes the row before the handler runs, so a handler crash silently drops "
"the message — defeating the outbox reliability guarantee. Use NACK_ON_ERROR "
"(default, retries via retry_strategy), REJECT_ON_ERROR (delete on first "
"failure, no retry), or MANUAL (handler calls msg.ack()/nack()/reject() itself)."
)
raise ValueError(msg)
if ack_policy is AckPolicy.REJECT_ON_ERROR and self.retry_strategy is not None and not is_no_retry:
_subscriber_warn(
"ack_policy=REJECT_ON_ERROR rejects on the first handler error; the "
"retry_strategy is ignored. Pass ack_policy=NACK_ON_ERROR (default) to "
"honor retry, or drop retry_strategy if you really want first-error deletion.",
)
if ack_policy is AckPolicy.NACK_ON_ERROR and is_no_retry:
_subscriber_warn(
"ack_policy=NACK_ON_ERROR with retry_strategy=NoRetry() has the same effect "
"as REJECT_ON_ERROR (one attempt, then delete). Pick one for clarity.",
)
if self.max_deliveries is not None and (self.retry_strategy is None or is_no_retry):
_subscriber_warn(
"max_deliveries is set but no retry_strategy is configured (or NoRetry was "
"passed); the delivery cap is unreachable on the happy path since the row "
"is deleted after the first attempt.",
)
if self.lease_ttl_seconds <= self.max_fetch_interval:
_subscriber_warn(
f"lease_ttl_seconds ({self.lease_ttl_seconds}) <= max_fetch_interval "
f"({self.max_fetch_interval}): a lease can expire during a single idle wait "
f"before the next fetch even runs, causing spurious lease-expiry reclaim "
f"of healthy in-flight rows. Recommended: lease_ttl_seconds >= "
f"2 * max_fetch_interval + P99(handler).",
)

@property
def ack_policy(self) -> AckPolicy:
if self._ack_policy is EMPTY:
Expand Down
114 changes: 2 additions & 112 deletions faststream_outbox/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import typing
import warnings
from pathlib import Path

import faststream
from faststream._internal.constants import EMPTY
from faststream._internal.endpoint.subscriber.call_item import CallsCollection
from faststream.middlewares import AckPolicy

from faststream_outbox.retry import NoRetry
from faststream_outbox.subscriber.config import OutboxSubscriberConfig, OutboxSubscriberSpecificationConfig
from faststream_outbox.subscriber.usecase import OutboxSubscriber, OutboxSubscriberSpecification

Expand All @@ -17,16 +13,6 @@
from faststream_outbox.retry import RetryStrategyProto


# P27: attribute the subscriber-config warnings to the user's ``@broker.subscriber(...)``
# / ``@router.subscriber(...)`` call site by skipping frames inside this package and
# faststream. A static ``stacklevel`` can't be correct for both the direct path and the
# FastAPI-router path (the router adds frames); ``skip_file_prefixes`` (3.12+) is.
_WARN_SKIP_PREFIXES = (
str(Path(__file__).parent.parent), # the faststream_outbox package dir
str(Path(faststream.__file__).parent), # the faststream package dir
)


def create_subscriber(
*,
queues: list[str],
Expand All @@ -44,16 +30,8 @@ def create_subscriber(
description_: str | None = None,
include_in_schema: bool = True,
) -> OutboxSubscriber:
_validate_subscriber_config(
max_workers=max_workers,
fetch_batch_size=fetch_batch_size,
min_fetch_interval=min_fetch_interval,
max_fetch_interval=max_fetch_interval,
lease_ttl_seconds=lease_ttl_seconds,
max_deliveries=max_deliveries,
ack_policy=ack_policy,
retry_strategy=retry_strategy,
)
# Knob validation lives in OutboxSubscriberConfig.__post_init__ — constructing the
# config below validates it, so every construction path is guarded (not just this one).
usecase_config = OutboxSubscriberConfig(
_outer_config=config,
_ack_policy=ack_policy if ack_policy is not None else EMPTY,
Expand Down Expand Up @@ -84,91 +62,3 @@ def create_subscriber(
specification=specification,
calls=calls,
)


def _validate_subscriber_config( # noqa: C901 # flat sequence of independent knob checks
*,
max_workers: int,
fetch_batch_size: int,
min_fetch_interval: float,
max_fetch_interval: float,
lease_ttl_seconds: float,
max_deliveries: int | None,
ack_policy: AckPolicy | None,
retry_strategy: "RetryStrategyProto | None",
) -> None:
"""
Reject impossible knob values, warn on combos that silently misbehave.

Errors are raised here (not deferred to runtime) so the user gets a
traceback pointing at the ``@broker.subscriber(...)`` decorator. Warnings use
``skip_file_prefixes`` (see ``_WARN_SKIP_PREFIXES``) so they are attributed to the
user's call site on both the direct and FastAPI-router paths (P27).
"""
if max_workers <= 0:
msg = f"max_workers must be >= 1, got {max_workers}"
raise ValueError(msg)
if fetch_batch_size <= 0:
msg = f"fetch_batch_size must be >= 1, got {fetch_batch_size}"
raise ValueError(msg)
# P12: non-positive intervals/TTL turn the adaptive backoff into a busy-poll (or an
# instantly-expiring lease). Reject up front rather than spin a hot loop at runtime.
if min_fetch_interval <= 0:
msg = f"min_fetch_interval must be > 0, got {min_fetch_interval}"
raise ValueError(msg)
if max_fetch_interval <= 0:
msg = f"max_fetch_interval must be > 0, got {max_fetch_interval}"
raise ValueError(msg)
if lease_ttl_seconds <= 0:
msg = f"lease_ttl_seconds must be > 0, got {lease_ttl_seconds}"
raise ValueError(msg)
if min_fetch_interval > max_fetch_interval:
msg = (
f"min_fetch_interval ({min_fetch_interval}) must be <= max_fetch_interval "
f"({max_fetch_interval}); the adaptive idle backoff grows from ~min_fetch_interval "
f"(the base interval, with ±50% jitter) up to max_fetch_interval (the ceiling)."
)
raise ValueError(msg)
is_no_retry = isinstance(retry_strategy, NoRetry)
if ack_policy is AckPolicy.ACK_FIRST:
msg = (
"ack_policy=AckPolicy.ACK_FIRST is not supported by the outbox broker: it "
"deletes the row before the handler runs, so a handler crash silently drops "
"the message — defeating the outbox reliability guarantee. Use NACK_ON_ERROR "
"(default, retries via retry_strategy), REJECT_ON_ERROR (delete on first "
"failure, no retry), or MANUAL (handler calls msg.ack()/nack()/reject() itself)."
)
raise ValueError(msg)
if ack_policy is AckPolicy.REJECT_ON_ERROR and retry_strategy is not None and not is_no_retry:
warnings.warn(
"ack_policy=REJECT_ON_ERROR rejects on the first handler error; the "
"retry_strategy is ignored. Pass ack_policy=NACK_ON_ERROR (default) to "
"honor retry, or drop retry_strategy if you really want first-error deletion.",
UserWarning,
skip_file_prefixes=_WARN_SKIP_PREFIXES,
)
if ack_policy is AckPolicy.NACK_ON_ERROR and is_no_retry:
warnings.warn(
"ack_policy=NACK_ON_ERROR with retry_strategy=NoRetry() has the same effect "
"as REJECT_ON_ERROR (one attempt, then delete). Pick one for clarity.",
UserWarning,
skip_file_prefixes=_WARN_SKIP_PREFIXES,
)
if max_deliveries is not None and (retry_strategy is None or is_no_retry):
warnings.warn(
"max_deliveries is set but no retry_strategy is configured (or NoRetry was "
"passed); the delivery cap is unreachable on the happy path since the row "
"is deleted after the first attempt.",
UserWarning,
skip_file_prefixes=_WARN_SKIP_PREFIXES,
)
if lease_ttl_seconds <= max_fetch_interval:
warnings.warn(
f"lease_ttl_seconds ({lease_ttl_seconds}) <= max_fetch_interval "
f"({max_fetch_interval}): a lease can expire during a single idle wait "
f"before the next fetch even runs, causing spurious lease-expiry reclaim "
f"of healthy in-flight rows. Recommended: lease_ttl_seconds >= "
f"2 * max_fetch_interval + P99(handler).",
UserWarning,
skip_file_prefixes=_WARN_SKIP_PREFIXES,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
status: shipped
date: 2026-06-23
slug: self-validating-subscriber-config
summary: Move subscriber-knob validation from the factory into OutboxSubscriberConfig.__post_init__ so every construction path is validated, not just the factory's.
supersedes: null
superseded_by: null
pr: 111
outcome: |
Landed. Validation moved to OutboxSubscriberConfig.__post_init__ (guarded super-call +
self._validate()); factory.py dropped _validate_subscriber_config and now just wires.
Behavior preserved exactly (EMPTY→None ack_policy mapping). One wrinkle surfaced under
CI: moving validation under the dataclass-generated __init__ added a "<string>" frame
that the 3.13 C warnings.warn(skip_file_prefixes=...) refuses to skip (works on 3.14),
so the FastAPI-router attribution test failed in docker. Replaced skip_file_prefixes
with a manual stacklevel walk (_subscriber_warn) that's version- and call-path-robust.
Existing validation tests passed as the regression guard; full suite 543 passed at 100%.
---

# Change: Make the subscriber config validate itself

**Lane:** lightweight — 2 files, net-neutral LOC (a relocation), no new file, no
public-API change, existing tests cover it.

## Goal

Subscriber-knob validation lived in `subscriber/factory.py::_validate_subscriber_config`,
called from `create_subscriber`. `OutboxSubscriberConfig` was a bare `@dataclass` with no
`__post_init__`, so constructing it directly bypassed every guard. Move validation into
`OutboxSubscriberConfig.__post_init__` so *every* construction path — `@broker.subscriber`,
`@router.subscriber`, direct construction — is validated; there is no way in that skips it.

This is candidate #3 from the 2026-06-23 architecture review.

## Approach

- `OutboxSubscriberConfig.__post_init__` does a guarded `super().__post_init__()` (upstream
`SubscriberUsecaseConfig` has none today, but dataclasses call only the most-derived one —
the guard keeps a future upstream init running) then `self._validate()`.
- `_validate()` holds the relocated checks. Behavior is **preserved exactly**: it maps
`_ack_policy is EMPTY → None` so the checks match on the *explicitly-passed* policy as the
factory-side validation did (e.g. the NACK+NoRetry advisory still fires only on an explicit
`NACK_ON_ERROR`, not on the default that resolves to it via the `ack_policy` property).
- `create_subscriber` drops the validation call and the function; it just wires now.

**Warning attribution (version-robustness).** Moving validation under the dataclass-generated
`__init__` inserts a frame whose `co_filename` is the literal `"<string>"` between the user's
call and `__post_init__`. The 3.13 C `warnings.warn(skip_file_prefixes=...)` does **not** skip
that frame (3.14 does), so warnings mis-attributed to `<string>` and the FastAPI-router
attribution test failed under docker/3.13. Replaced `skip_file_prefixes` with `_subscriber_warn`,
which computes `stacklevel` by walking out to the first non-internal frame (this package /
faststream / `<string>`) — robust across CPython versions and across the direct vs router paths
(which differ in frame count).

**Why minimal — broader "config validates everything" not pursued.** Only subscriber-knob
validation moved. Other config objects (`OutboxBrokerConfig`, publisher config) were not
swept in; this change is scoped to the one shallow factory the review flagged.

## Files

- `faststream_outbox/subscriber/config.py` — add `__post_init__` + `_validate` +
`_subscriber_warn` (manual-stacklevel attribution).
- `faststream_outbox/subscriber/factory.py` — remove `_validate_subscriber_config` and its
call; keep construction/wiring.

## Verification

- [x] Existing tests are the regression guard (unchanged): `test_unit.py` subscriber
reject/warn/no-warning cases (via `@broker.subscriber`), `test_fastapi.py`
`test_subscriber_misconfig_warning_attributed_to_user_via_fastapi_router` (router path
attribution), `test_fake.py::test_subscriber_config_ack_policy_returns_explicit_value`
(direct construction).
- [x] `just test` — 543 passed, 100% coverage (docker / Python 3.13).
- [x] `just lint-ci` — clean.