From a4a953fd8bfbbd10507d61cc2e974d1cd33bdabf Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 18:55:04 +0300 Subject: [PATCH 1/2] refactor(subscriber): make OutboxSubscriberConfig validate itself Move subscriber-knob validation from factory._validate_subscriber_config into OutboxSubscriberConfig.__post_init__ (guarded super-call + _validate) so every construction path is guarded, not just the factory's. Behavior preserved exactly (EMPTY->None ack_policy mapping). Replace skip_file_prefixes with a manual stacklevel walk (_subscriber_warn) since the 3.13 C warn won't skip the dataclass __init__ '' frame now between __post_init__ and the user. Co-Authored-By: Claude Opus 4.8 (1M context) --- faststream_outbox/subscriber/config.py | 121 ++++++++++++++++++ faststream_outbox/subscriber/factory.py | 114 +---------------- .../change.md | 74 +++++++++++ 3 files changed, 197 insertions(+), 112 deletions(-) create mode 100644 planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md diff --git a/faststream_outbox/subscriber/config.py b/faststream_outbox/subscriber/config.py index 7ce5cd8..a08d996 100644 --- a/faststream_outbox/subscriber/config.py +++ b/faststream_outbox/subscriber/config.py @@ -1,16 +1,51 @@ +import sys import typing +import warnings from dataclasses import dataclass +from pathlib import Path +import faststream from faststream._internal.configs import SubscriberSpecificationConfig, SubscriberUsecaseConfig from faststream._internal.constants import EMPTY from faststream.middlewares import AckPolicy +from faststream_outbox.retry import NoRetry + if typing.TYPE_CHECKING: from faststream_outbox.configs import OutboxBrokerConfig from faststream_outbox.retry import RetryStrategyProto +# Frames to walk past when attributing a subscriber-config warning to the user's call site: +# this package, faststream, and the dataclass-generated ``__init__`` (its ``co_filename`` is +# the literal ``""``) that calls ``__post_init__``. +_PKG_DIR = str(Path(__file__).parent.parent) # the faststream_outbox package dir +_FASTSTREAM_DIR = str(Path(faststream.__file__).parent) # the faststream package dir + + +def _is_internal_frame(filename: str) -> bool: + return filename == "" or filename.startswith((_PKG_DIR, _FASTSTREAM_DIR)) + + +def _subscriber_warn(message: str) -> None: + """ + Attribute a subscriber-config ``UserWarning`` to the user's ``@subscriber`` call (P27). + + Computes ``stacklevel`` by walking out to the first non-internal frame instead of using + ``warnings.warn(skip_file_prefixes=...)``: the 3.13 C ``warn`` does not skip the + ``""`` dataclass-``__init__`` frame between ``__post_init__`` and the user (it + works on 3.14), and a static ``stacklevel`` can't be right for both the direct and the + FastAPI-router paths (they differ in frame count). + """ + stacklevel = 2 # frame above this helper's caller (``_validate``) + frame: typing.Any = sys._getframe(1) # noqa: SLF001 # the caller, ``_validate`` + while frame is not None and _is_internal_frame(frame.f_code.co_filename): + frame = frame.f_back + stacklevel += 1 + warnings.warn(message, UserWarning, stacklevel=stacklevel) + + @dataclass(kw_only=True) class OutboxSubscriberConfig(SubscriberUsecaseConfig): _outer_config: "OutboxBrokerConfig" @@ -24,6 +59,92 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig): max_deliveries: int | None propagate_inbound_headers: bool + def __post_init__(self) -> None: + # Validation lives here (not in the factory) so *every* construction path — + # ``@broker.subscriber`` / ``@router.subscriber`` / direct construction — is + # validated; there is no way to build a misconfigured config that skips the guards. + # Upstream-divergence watch: ``SubscriberUsecaseConfig`` has no ``__post_init__`` + # today, but dataclasses call only the most-derived one — if faststream adds init + # logic there, this guarded call keeps it running instead of silently shadowing it. + parent_post_init = getattr(super(), "__post_init__", None) + if parent_post_init is not None: # pragma: no cover # defensive: base has none today + parent_post_init() + self._validate() + + def _validate(self) -> None: # noqa: C901 # flat sequence of independent knob checks + """ + Reject impossible knob values, warn on combos that silently misbehave. + + Errors are raised here (not deferred to runtime) so the user gets a traceback + pointing at the ``@broker.subscriber(...)`` decorator. Warnings use + ``skip_file_prefixes`` (see ``_WARN_SKIP_PREFIXES``) so they are attributed to the + user's call site on both the direct and FastAPI-router paths (P27). + """ + # EMPTY means "ack_policy not passed" — map it back to None so the checks below + # match on the *explicitly-passed* policy exactly as the factory-side validation + # did (e.g. the NACK+NoRetry warning fires only on an explicit NACK_ON_ERROR, not + # on the default that resolves to NACK_ON_ERROR via the ``ack_policy`` property). + ack_policy = None if self._ack_policy is EMPTY else self._ack_policy + if self.max_workers <= 0: + msg = f"max_workers must be >= 1, got {self.max_workers}" + raise ValueError(msg) + if self.fetch_batch_size <= 0: + msg = f"fetch_batch_size must be >= 1, got {self.fetch_batch_size}" + raise ValueError(msg) + # P12: non-positive intervals/TTL turn the adaptive backoff into a busy-poll (or an + # instantly-expiring lease). Reject up front rather than spin a hot loop at runtime. + if self.min_fetch_interval <= 0: + msg = f"min_fetch_interval must be > 0, got {self.min_fetch_interval}" + raise ValueError(msg) + if self.max_fetch_interval <= 0: + msg = f"max_fetch_interval must be > 0, got {self.max_fetch_interval}" + raise ValueError(msg) + if self.lease_ttl_seconds <= 0: + msg = f"lease_ttl_seconds must be > 0, got {self.lease_ttl_seconds}" + raise ValueError(msg) + if self.min_fetch_interval > self.max_fetch_interval: + msg = ( + f"min_fetch_interval ({self.min_fetch_interval}) must be <= max_fetch_interval " + f"({self.max_fetch_interval}); the adaptive idle backoff grows from ~min_fetch_interval " + f"(the base interval, with ±50% jitter) up to max_fetch_interval (the ceiling)." + ) + raise ValueError(msg) + is_no_retry = isinstance(self.retry_strategy, NoRetry) + if ack_policy is AckPolicy.ACK_FIRST: + msg = ( + "ack_policy=AckPolicy.ACK_FIRST is not supported by the outbox broker: it " + "deletes the row before the handler runs, so a handler crash silently drops " + "the message — defeating the outbox reliability guarantee. Use NACK_ON_ERROR " + "(default, retries via retry_strategy), REJECT_ON_ERROR (delete on first " + "failure, no retry), or MANUAL (handler calls msg.ack()/nack()/reject() itself)." + ) + raise ValueError(msg) + if ack_policy is AckPolicy.REJECT_ON_ERROR and self.retry_strategy is not None and not is_no_retry: + _subscriber_warn( + "ack_policy=REJECT_ON_ERROR rejects on the first handler error; the " + "retry_strategy is ignored. Pass ack_policy=NACK_ON_ERROR (default) to " + "honor retry, or drop retry_strategy if you really want first-error deletion.", + ) + if ack_policy is AckPolicy.NACK_ON_ERROR and is_no_retry: + _subscriber_warn( + "ack_policy=NACK_ON_ERROR with retry_strategy=NoRetry() has the same effect " + "as REJECT_ON_ERROR (one attempt, then delete). Pick one for clarity.", + ) + if self.max_deliveries is not None and (self.retry_strategy is None or is_no_retry): + _subscriber_warn( + "max_deliveries is set but no retry_strategy is configured (or NoRetry was " + "passed); the delivery cap is unreachable on the happy path since the row " + "is deleted after the first attempt.", + ) + if self.lease_ttl_seconds <= self.max_fetch_interval: + _subscriber_warn( + f"lease_ttl_seconds ({self.lease_ttl_seconds}) <= max_fetch_interval " + f"({self.max_fetch_interval}): a lease can expire during a single idle wait " + f"before the next fetch even runs, causing spurious lease-expiry reclaim " + f"of healthy in-flight rows. Recommended: lease_ttl_seconds >= " + f"2 * max_fetch_interval + P99(handler).", + ) + @property def ack_policy(self) -> AckPolicy: if self._ack_policy is EMPTY: diff --git a/faststream_outbox/subscriber/factory.py b/faststream_outbox/subscriber/factory.py index efb6295..b0b653a 100644 --- a/faststream_outbox/subscriber/factory.py +++ b/faststream_outbox/subscriber/factory.py @@ -1,13 +1,9 @@ import typing -import warnings -from pathlib import Path -import faststream from faststream._internal.constants import EMPTY from faststream._internal.endpoint.subscriber.call_item import CallsCollection from faststream.middlewares import AckPolicy -from faststream_outbox.retry import NoRetry from faststream_outbox.subscriber.config import OutboxSubscriberConfig, OutboxSubscriberSpecificationConfig from faststream_outbox.subscriber.usecase import OutboxSubscriber, OutboxSubscriberSpecification @@ -17,16 +13,6 @@ from faststream_outbox.retry import RetryStrategyProto -# P27: attribute the subscriber-config warnings to the user's ``@broker.subscriber(...)`` -# / ``@router.subscriber(...)`` call site by skipping frames inside this package and -# faststream. A static ``stacklevel`` can't be correct for both the direct path and the -# FastAPI-router path (the router adds frames); ``skip_file_prefixes`` (3.12+) is. -_WARN_SKIP_PREFIXES = ( - str(Path(__file__).parent.parent), # the faststream_outbox package dir - str(Path(faststream.__file__).parent), # the faststream package dir -) - - def create_subscriber( *, queues: list[str], @@ -44,16 +30,8 @@ def create_subscriber( description_: str | None = None, include_in_schema: bool = True, ) -> OutboxSubscriber: - _validate_subscriber_config( - max_workers=max_workers, - fetch_batch_size=fetch_batch_size, - min_fetch_interval=min_fetch_interval, - max_fetch_interval=max_fetch_interval, - lease_ttl_seconds=lease_ttl_seconds, - max_deliveries=max_deliveries, - ack_policy=ack_policy, - retry_strategy=retry_strategy, - ) + # Knob validation lives in OutboxSubscriberConfig.__post_init__ — constructing the + # config below validates it, so every construction path is guarded (not just this one). usecase_config = OutboxSubscriberConfig( _outer_config=config, _ack_policy=ack_policy if ack_policy is not None else EMPTY, @@ -84,91 +62,3 @@ def create_subscriber( specification=specification, calls=calls, ) - - -def _validate_subscriber_config( # noqa: C901 # flat sequence of independent knob checks - *, - max_workers: int, - fetch_batch_size: int, - min_fetch_interval: float, - max_fetch_interval: float, - lease_ttl_seconds: float, - max_deliveries: int | None, - ack_policy: AckPolicy | None, - retry_strategy: "RetryStrategyProto | None", -) -> None: - """ - Reject impossible knob values, warn on combos that silently misbehave. - - Errors are raised here (not deferred to runtime) so the user gets a - traceback pointing at the ``@broker.subscriber(...)`` decorator. Warnings use - ``skip_file_prefixes`` (see ``_WARN_SKIP_PREFIXES``) so they are attributed to the - user's call site on both the direct and FastAPI-router paths (P27). - """ - if max_workers <= 0: - msg = f"max_workers must be >= 1, got {max_workers}" - raise ValueError(msg) - if fetch_batch_size <= 0: - msg = f"fetch_batch_size must be >= 1, got {fetch_batch_size}" - raise ValueError(msg) - # P12: non-positive intervals/TTL turn the adaptive backoff into a busy-poll (or an - # instantly-expiring lease). Reject up front rather than spin a hot loop at runtime. - if min_fetch_interval <= 0: - msg = f"min_fetch_interval must be > 0, got {min_fetch_interval}" - raise ValueError(msg) - if max_fetch_interval <= 0: - msg = f"max_fetch_interval must be > 0, got {max_fetch_interval}" - raise ValueError(msg) - if lease_ttl_seconds <= 0: - msg = f"lease_ttl_seconds must be > 0, got {lease_ttl_seconds}" - raise ValueError(msg) - if min_fetch_interval > max_fetch_interval: - msg = ( - f"min_fetch_interval ({min_fetch_interval}) must be <= max_fetch_interval " - f"({max_fetch_interval}); the adaptive idle backoff grows from ~min_fetch_interval " - f"(the base interval, with ±50% jitter) up to max_fetch_interval (the ceiling)." - ) - raise ValueError(msg) - is_no_retry = isinstance(retry_strategy, NoRetry) - if ack_policy is AckPolicy.ACK_FIRST: - msg = ( - "ack_policy=AckPolicy.ACK_FIRST is not supported by the outbox broker: it " - "deletes the row before the handler runs, so a handler crash silently drops " - "the message — defeating the outbox reliability guarantee. Use NACK_ON_ERROR " - "(default, retries via retry_strategy), REJECT_ON_ERROR (delete on first " - "failure, no retry), or MANUAL (handler calls msg.ack()/nack()/reject() itself)." - ) - raise ValueError(msg) - if ack_policy is AckPolicy.REJECT_ON_ERROR and retry_strategy is not None and not is_no_retry: - warnings.warn( - "ack_policy=REJECT_ON_ERROR rejects on the first handler error; the " - "retry_strategy is ignored. Pass ack_policy=NACK_ON_ERROR (default) to " - "honor retry, or drop retry_strategy if you really want first-error deletion.", - UserWarning, - skip_file_prefixes=_WARN_SKIP_PREFIXES, - ) - if ack_policy is AckPolicy.NACK_ON_ERROR and is_no_retry: - warnings.warn( - "ack_policy=NACK_ON_ERROR with retry_strategy=NoRetry() has the same effect " - "as REJECT_ON_ERROR (one attempt, then delete). Pick one for clarity.", - UserWarning, - skip_file_prefixes=_WARN_SKIP_PREFIXES, - ) - if max_deliveries is not None and (retry_strategy is None or is_no_retry): - warnings.warn( - "max_deliveries is set but no retry_strategy is configured (or NoRetry was " - "passed); the delivery cap is unreachable on the happy path since the row " - "is deleted after the first attempt.", - UserWarning, - skip_file_prefixes=_WARN_SKIP_PREFIXES, - ) - if lease_ttl_seconds <= max_fetch_interval: - warnings.warn( - f"lease_ttl_seconds ({lease_ttl_seconds}) <= max_fetch_interval " - f"({max_fetch_interval}): a lease can expire during a single idle wait " - f"before the next fetch even runs, causing spurious lease-expiry reclaim " - f"of healthy in-flight rows. Recommended: lease_ttl_seconds >= " - f"2 * max_fetch_interval + P99(handler).", - UserWarning, - skip_file_prefixes=_WARN_SKIP_PREFIXES, - ) diff --git a/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md b/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md new file mode 100644 index 0000000..64e9dd7 --- /dev/null +++ b/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md @@ -0,0 +1,74 @@ +--- +status: shipped +date: 2026-06-23 +slug: self-validating-subscriber-config +summary: Move subscriber-knob validation from the factory into OutboxSubscriberConfig.__post_init__ so every construction path is validated, not just the factory's. +supersedes: null +superseded_by: null +pr: null +outcome: | + Landed. Validation moved to OutboxSubscriberConfig.__post_init__ (guarded super-call + + self._validate()); factory.py dropped _validate_subscriber_config and now just wires. + Behavior preserved exactly (EMPTY→None ack_policy mapping). One wrinkle surfaced under + CI: moving validation under the dataclass-generated __init__ added a "" frame + that the 3.13 C warnings.warn(skip_file_prefixes=...) refuses to skip (works on 3.14), + so the FastAPI-router attribution test failed in docker. Replaced skip_file_prefixes + with a manual stacklevel walk (_subscriber_warn) that's version- and call-path-robust. + Existing validation tests passed as the regression guard; full suite 543 passed at 100%. +--- + +# Change: Make the subscriber config validate itself + +**Lane:** lightweight — 2 files, net-neutral LOC (a relocation), no new file, no +public-API change, existing tests cover it. + +## Goal + +Subscriber-knob validation lived in `subscriber/factory.py::_validate_subscriber_config`, +called from `create_subscriber`. `OutboxSubscriberConfig` was a bare `@dataclass` with no +`__post_init__`, so constructing it directly bypassed every guard. Move validation into +`OutboxSubscriberConfig.__post_init__` so *every* construction path — `@broker.subscriber`, +`@router.subscriber`, direct construction — is validated; there is no way in that skips it. + +This is candidate #3 from the 2026-06-23 architecture review. + +## Approach + +- `OutboxSubscriberConfig.__post_init__` does a guarded `super().__post_init__()` (upstream + `SubscriberUsecaseConfig` has none today, but dataclasses call only the most-derived one — + the guard keeps a future upstream init running) then `self._validate()`. +- `_validate()` holds the relocated checks. Behavior is **preserved exactly**: it maps + `_ack_policy is EMPTY → None` so the checks match on the *explicitly-passed* policy as the + factory-side validation did (e.g. the NACK+NoRetry advisory still fires only on an explicit + `NACK_ON_ERROR`, not on the default that resolves to it via the `ack_policy` property). +- `create_subscriber` drops the validation call and the function; it just wires now. + +**Warning attribution (version-robustness).** Moving validation under the dataclass-generated +`__init__` inserts a frame whose `co_filename` is the literal `""` between the user's +call and `__post_init__`. The 3.13 C `warnings.warn(skip_file_prefixes=...)` does **not** skip +that frame (3.14 does), so warnings mis-attributed to `` and the FastAPI-router +attribution test failed under docker/3.13. Replaced `skip_file_prefixes` with `_subscriber_warn`, +which computes `stacklevel` by walking out to the first non-internal frame (this package / +faststream / ``) — robust across CPython versions and across the direct vs router paths +(which differ in frame count). + +**Why minimal — broader "config validates everything" not pursued.** Only subscriber-knob +validation moved. Other config objects (`OutboxBrokerConfig`, publisher config) were not +swept in; this change is scoped to the one shallow factory the review flagged. + +## Files + +- `faststream_outbox/subscriber/config.py` — add `__post_init__` + `_validate` + + `_subscriber_warn` (manual-stacklevel attribution). +- `faststream_outbox/subscriber/factory.py` — remove `_validate_subscriber_config` and its + call; keep construction/wiring. + +## Verification + +- [x] Existing tests are the regression guard (unchanged): `test_unit.py` subscriber + reject/warn/no-warning cases (via `@broker.subscriber`), `test_fastapi.py` + `test_subscriber_misconfig_warning_attributed_to_user_via_fastapi_router` (router path + attribution), `test_fake.py::test_subscriber_config_ack_policy_returns_explicit_value` + (direct construction). +- [x] `just test` — 543 passed, 100% coverage (docker / Python 3.13). +- [x] `just lint-ci` — clean. From e10bf6af68e1aebc4ea32670d5ea5eb070b96c5a Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 18:56:31 +0300 Subject: [PATCH 2/2] docs(planning): record PR #111 on self-validating-subscriber-config change Co-Authored-By: Claude Opus 4.8 (1M context) --- .../2026-06-23.03-self-validating-subscriber-config/change.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md b/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md index 64e9dd7..5686b40 100644 --- a/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md +++ b/planning/changes/2026-06-23.03-self-validating-subscriber-config/change.md @@ -5,7 +5,7 @@ slug: self-validating-subscriber-config summary: Move subscriber-knob validation from the factory into OutboxSubscriberConfig.__post_init__ so every construction path is validated, not just the factory's. supersedes: null superseded_by: null -pr: null +pr: 111 outcome: | Landed. Validation moved to OutboxSubscriberConfig.__post_init__ (guarded super-call + self._validate()); factory.py dropped _validate_subscriber_config and now just wires.