From e0aa4120b81144ced9e801764baa57c51d831e32 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 11:08:52 +0300 Subject: [PATCH 1/6] fix: pass handler exception to RetryStrategyProto AcknowledgementMiddleware catches handler exceptions in its own __aexit__ and calls message.nack() directly, so the worker loop's defensive `except BaseException` never ran in practice. _nack saw last_exception=None and the documented "retry only on transient errors" subclass pattern (retry.py:13-15) was silently broken. Add a small BaseMiddleware whose __aexit__ stashes exc_val onto the OutboxInnerMessage. It sits at the top of the broker_middlewares tuple so its __aexit__ runs before AcknowledgementMiddleware's in stack-pop order, populating row.last_exception in time for nack to read it. Drop the now-truly-dead `except BaseException` block in _worker_loop. Tests cover both the happy path (exception is captured) and the documented subclass pattern (terminate on ValueError, retry on RuntimeError). Strengthen CLAUDE.md's import rule: never use local/inline imports. --- CLAUDE.md | 3 +- faststream_outbox/broker.py | 26 +++++++- faststream_outbox/subscriber/usecase.py | 8 +-- tests/test_fake.py | 86 +++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 649040e..b06cc41 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -59,6 +59,7 @@ The caller owns the `AsyncEngine`. `OutboxBrokerConfig.disconnect()` deliberatel ## Conventions -- Python 3.13+. Global imports (no inline imports) per user's global rules. +- Python 3.13+. +- **Never use local/inline imports.** All imports go at the top of the module — no `import` statements inside functions, methods, or `if TYPE_CHECKING` exception aside. This applies to test files too. If a `# noqa: PLC0415` is the only way to keep an import inline, hoist it instead. - `ruff` is set to `select = ["ALL"]` with a documented ignore list in `pyproject.toml`; many `# noqa: XXX` comments are intentional and align with that list. - Type checker is `ty`. Use `# ty: ignore[]` for intentional escapes (matches existing usage in `broker.py`, `registrator.py`). diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index 12e63e4..f9f747a 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -9,6 +9,7 @@ import logging import typing from collections.abc import Iterable, Sequence +from types import TracebackType from faststream import BaseMiddleware from faststream._internal.basic_types import LoggerProto @@ -40,6 +41,29 @@ from faststream_outbox.subscriber.usecase import OutboxSubscriber +class _CaptureExceptionMiddleware(BaseMiddleware): + """ + Stash the handler exception on the inner row before AckMiddleware nacks. + + FastStream's AcknowledgementMiddleware catches the handler exception in its + own ``__aexit__`` and calls ``message.nack()`` directly — the exception never + propagates back to the worker loop. Without this middleware, ``OutboxInnerMessage._nack`` + sees ``last_exception=None`` and retry strategies that branch on exception type + can't work. We sit one step closer to the handler in the middleware stack so + our ``__aexit__`` runs before AckMiddleware's, capturing ``exc_val`` onto the row. + """ + + async def __aexit__( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: TracebackType | None = None, + ) -> bool | None: + if exc_val is not None and isinstance(self.msg, OutboxInnerMessage): + self.msg.last_exception = exc_val + return False + + class OutboxParamsStorage(DefaultLoggerStorage): _max_msg_id_ln = -1 _max_queue_name = 7 @@ -100,7 +124,7 @@ def __init__( # noqa: PLR0913 engine_state=engine_state, outbox_table=outbox_table, client=client, - broker_middlewares=middlewares, + broker_middlewares=(_CaptureExceptionMiddleware, *middlewares), broker_parser=parser, broker_decoder=decoder, logger=make_logger_state( diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index c0a37f4..e4f00c2 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -151,13 +151,11 @@ async def _worker_loop(self) -> None: if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger): await self._flush_terminal(row) continue - # FastStream's AckPolicy middleware catches handler exceptions before - # they reach this except, so this branch is defensive. + # AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware + # stashes exc onto row.last_exception before nack runs, so retry strategies + # can branch on exception type. try: await self.consume(row) - except BaseException as exc: # pragma: no cover - row.last_exception = exc - raise finally: await row.assert_state_set(logger) await self._flush_result(row) diff --git a/tests/test_fake.py b/tests/test_fake.py index 2a27855..822ddf7 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -8,6 +8,7 @@ from faststream_outbox import ( ConstantRetry, OutboxBroker, + RetryStrategyProto, TestOutboxBroker, make_outbox_table, ) @@ -548,3 +549,88 @@ async def test_fake_connect_is_noop() -> None: test_broker = TestOutboxBroker(broker) # Direct call exercises L226 even though it's also called during __aenter__. await test_broker._fake_connect(broker) # noqa: SLF001 + + +async def test_retry_strategy_receives_handler_exception() -> None: + """RetryStrategyProto.get_next_attempt_at must see the raised exception, not None.""" + seen_exceptions: list[BaseException | None] = [] + + class RecordingStrategy(RetryStrategyProto): + def get_next_attempt_at( + self, + *, + first_attempt_at: _dt.datetime, # noqa: ARG002 + last_attempt_at: _dt.datetime, # noqa: ARG002 + attempts_count: int, # noqa: ARG002 + exception: BaseException | None = None, + ) -> _dt.datetime | None: + seen_exceptions.append(exception) + return None # terminal so the test wraps up promptly + + broker = _make_broker() + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + retry_strategy=RecordingStrategy(), + ) + async def handle(body: str) -> None: + del body + msg = "boom-transient" + raise RuntimeError(msg) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + p, h = encode_payload("payload") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: seen_exceptions, timeout=3.0) + + assert len(seen_exceptions) >= 1 + exc = seen_exceptions[0] + assert isinstance(exc, RuntimeError) + assert str(exc) == "boom-transient" + + +async def test_retry_strategy_can_branch_on_exception_type() -> None: + """Subclass pattern from retry.py docstring: retry transient, terminate on permanent.""" + attempts: list[str] = [] + + class TransientOnlyStrategy(RetryStrategyProto): + def get_next_attempt_at( + self, + *, + first_attempt_at: _dt.datetime, # noqa: ARG002 + last_attempt_at: _dt.datetime, + attempts_count: int, # noqa: ARG002 + exception: BaseException | None = None, + ) -> _dt.datetime | None: + if isinstance(exception, ValueError): + return None # permanent → terminal + return last_attempt_at + _dt.timedelta(seconds=0.05) # transient → retry + + broker = _make_broker() + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + retry_strategy=TransientOnlyStrategy(), + ) + async def handle(body: str) -> None: + attempts.append(body) + # First call: transient (gets retried via the strategy's retry branch). + # Second call: permanent (terminates via the strategy's None branch). + if len(attempts) == 1: + msg = "transient" + raise RuntimeError(msg) + msg = "permanent" + raise ValueError(msg) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + p, h = encode_payload("body") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: not test_broker.fake_client.rows, timeout=5.0) + + assert len(attempts) == 2 # transient retried once, then permanent terminated From aeb9f755eb47302730dd6b79664e002b23fda16b Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 11:16:36 +0300 Subject: [PATCH 2/6] fix: drop OutboxRouter prefix to remove publish/subscribe asymmetry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OutboxRouter(prefix="svc-") used to silently rewrite subscriber queue names (subscriber("orders") → fetches "svc-orders") but broker.publish inserted the literal queue, so a router-prefixed subscriber would never see rows produced via broker.publish(queue="orders"). The producer side can't resolve a prefix coherently — a queue name can belong to multiple routers — so the only sane resolution is to drop the knob entirely. After this change, queues are routed by their literal name. If you want namespacing, put it in the queue string itself. - Remove `prefix` parameter from OutboxRouter.__init__. - Remove `OutboxSubscriberConfig.full_queues`; subscriber now reads `self.queues` directly. - Drop the prefix lookup in OutboxSubscriberSpecification.name (the spec name now uses the literal queue list). - Add a fake-broker regression test that exercises router-registered subscriber + plain-queue feed end-to-end. --- faststream_outbox/router.py | 11 ++++++++--- faststream_outbox/subscriber/config.py | 5 ----- faststream_outbox/subscriber/usecase.py | 5 ++--- tests/test_fake.py | 23 +++++++++++++++++++++++ tests/test_unit.py | 4 ++-- 5 files changed, 35 insertions(+), 13 deletions(-) diff --git a/faststream_outbox/router.py b/faststream_outbox/router.py index f4f5bb9..55ef3c5 100644 --- a/faststream_outbox/router.py +++ b/faststream_outbox/router.py @@ -62,11 +62,17 @@ def __init__( # noqa: PLR0913 class OutboxRouter(OutboxRegistrator, BrokerRouter[OutboxInnerMessage, BrokerConfig]): - """Includable router for ``OutboxBroker``.""" + """ + Includable router for ``OutboxBroker``. + + Use it to register subscribers in a separate module and attach them to the + broker via ``broker.include_router(router)``. There is no ``prefix`` knob: + queues are routed by their literal name, so producers and consumers must + agree on the exact string. If you want namespacing, put it in the queue name. + """ def __init__( # noqa: PLR0913 self, - prefix: str = "", handlers: Iterable[OutboxRoute] = (), *, dependencies: Iterable["Dependant"] = (), @@ -83,7 +89,6 @@ def __init__( # noqa: PLR0913 broker_parser=parser, broker_decoder=decoder, include_in_schema=include_in_schema, - prefix=prefix, ), handlers=handlers, # ty: ignore[unknown-argument] routers=routers, diff --git a/faststream_outbox/subscriber/config.py b/faststream_outbox/subscriber/config.py index d072b4c..205942b 100644 --- a/faststream_outbox/subscriber/config.py +++ b/faststream_outbox/subscriber/config.py @@ -26,11 +26,6 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig): release_stuck_interval: float max_deliveries: int | None - @property - def full_queues(self) -> list[str]: - prefix = self._outer_config.prefix or "" - return [f"{prefix}{q}" for q in self.queues] - @property def time_source(self) -> Callable[[], _dt.datetime]: return self._outer_config.time_source diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index e4f00c2..4155d65 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -46,9 +46,8 @@ class OutboxSubscriberSpecification(SubscriberSpecification["OutboxBrokerConfig", OutboxSubscriberSpecificationConfig]): @property def name(self) -> str: - prefix = getattr(self._outer_config, "prefix", "") joined = ",".join(self.config.queues) - return f"{prefix}{joined}:{self.call_name}" + return f"{joined}:{self.call_name}" def get_schema(self) -> dict[str, SubscriberSpec]: return { @@ -94,7 +93,7 @@ def _client(self) -> "OutboxClient": @property def _queues(self) -> list[str]: - return self._config.full_queues + return self._config.queues @typing.override async def start(self) -> None: diff --git a/tests/test_fake.py b/tests/test_fake.py index 822ddf7..0cb21ee 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -8,6 +8,7 @@ from faststream_outbox import ( ConstantRetry, OutboxBroker, + OutboxRouter, RetryStrategyProto, TestOutboxBroker, make_outbox_table, @@ -634,3 +635,25 @@ async def handle(body: str) -> None: await _wait_until(lambda: not test_broker.fake_client.rows, timeout=5.0) assert len(attempts) == 2 # transient retried once, then permanent terminated + + +async def test_router_subscriber_receives_plain_queue_publish() -> None: + """A subscriber registered via OutboxRouter must receive rows whose queue matches literally.""" + received: list[str] = [] + + router = OutboxRouter() + + @router.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + broker = _make_broker() + broker.include_router(router) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + p, h = encode_payload("via-router") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: received, timeout=3.0) + + assert received == ["via-router"] diff --git a/tests/test_unit.py b/tests/test_unit.py index 4e2558e..f108786 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -348,7 +348,7 @@ async def second(body: dict) -> None: ... def test_router_can_be_constructed() -> None: - router = OutboxRouter(prefix="svc-") + router = OutboxRouter() assert router is not None @@ -585,7 +585,7 @@ async def handle(body: dict) -> None: ... assert callable(sub._config.time_source) # noqa: SLF001 -def test_subscriber_specification_name_with_prefix() -> None: +def test_subscriber_specification_name_lists_queues() -> None: metadata = MetaData() t = make_outbox_table(metadata) broker = OutboxBroker(outbox_table=t) From 807d8e3873c6f8899b787f2b004be504fdbf0839 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 11:20:46 +0300 Subject: [PATCH 3/6] chore: delete dead OutboxBrokerConfig fields MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OutboxBrokerConfig.outbox_table was set in the broker constructor but never read — the broker stores its own self._outbox_table for inserts. OutboxBrokerConfig.time_source had a default of _utcnow but was only read by OutboxSubscriberConfig.time_source, a property only ever read by tests; the actual subscriber code paths use module-level _utcnow() from message.py. Both attributes were traps: a user setting time_source expecting it to take effect would see no behavior change. Drop them, drop the subscriber-config passthrough property, and drop the two unit tests that exercised the dead path. --- faststream_outbox/broker.py | 1 - faststream_outbox/configs.py | 9 --------- faststream_outbox/subscriber/config.py | 6 ------ tests/test_unit.py | 20 -------------------- 4 files changed, 36 deletions(-) diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index f9f747a..df6700b 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -122,7 +122,6 @@ def __init__( # noqa: PLR0913 fd_config = FastDependsConfig(use_fastdepends=apply_types) broker_config = OutboxBrokerConfig( engine_state=engine_state, - outbox_table=outbox_table, client=client, broker_middlewares=(_CaptureExceptionMiddleware, *middlewares), broker_parser=parser, diff --git a/faststream_outbox/configs.py b/faststream_outbox/configs.py index 52a7ca6..bd4df0e 100644 --- a/faststream_outbox/configs.py +++ b/faststream_outbox/configs.py @@ -6,9 +6,7 @@ client and to subscribers. """ -import datetime as _dt import typing -from collections.abc import Callable from dataclasses import dataclass, field from faststream._internal.configs import BrokerConfig @@ -16,16 +14,11 @@ if typing.TYPE_CHECKING: - from sqlalchemy import Table from sqlalchemy.ext.asyncio import AsyncEngine from faststream_outbox.client import OutboxClient -def _utcnow() -> _dt.datetime: - return _dt.datetime.now(tz=_dt.UTC) - - class EngineState: """ Lazy holder so the broker can be constructed before the engine is wired up. @@ -51,9 +44,7 @@ def set_engine(self, engine: "AsyncEngine") -> None: @dataclass(kw_only=True) class OutboxBrokerConfig(BrokerConfig): engine_state: EngineState = field(default_factory=EngineState) - outbox_table: "Table | None" = None client: "OutboxClient | None" = None - time_source: Callable[[], _dt.datetime] = _utcnow async def connect(self) -> None: # Engine and client are wired up by the broker's constructor; nothing to do here. diff --git a/faststream_outbox/subscriber/config.py b/faststream_outbox/subscriber/config.py index 205942b..c8564a7 100644 --- a/faststream_outbox/subscriber/config.py +++ b/faststream_outbox/subscriber/config.py @@ -1,6 +1,4 @@ -import datetime as _dt import typing -from collections.abc import Callable from dataclasses import dataclass from faststream._internal.configs import SubscriberSpecificationConfig, SubscriberUsecaseConfig @@ -26,10 +24,6 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig): release_stuck_interval: float max_deliveries: int | None - @property - def time_source(self) -> Callable[[], _dt.datetime]: - return self._outer_config.time_source - @property def ack_policy(self) -> AckPolicy: if self._ack_policy is EMPTY: diff --git a/tests/test_unit.py b/tests/test_unit.py index f108786..92477ba 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -504,14 +504,6 @@ def test_outbox_router_config_engine_state_raises() -> None: _ = cfg.engine_state -def test_outbox_broker_config_uses_default_time_source() -> None: - from faststream_outbox.configs import OutboxBrokerConfig # noqa: PLC0415 - - cfg = OutboxBrokerConfig() - now = cfg.time_source() - assert now.tzinfo is not None # naive datetimes would be a regression - - # --- client --- @@ -573,18 +565,6 @@ async def handler(body: str) -> None: ... assert route is not None -def test_subscriber_config_time_source_property() -> None: - metadata = MetaData() - t = make_outbox_table(metadata) - broker = OutboxBroker(outbox_table=t) - - @broker.subscriber("orders") - async def handle(body: dict) -> None: ... - - sub = next(iter(broker._subscribers)) # noqa: SLF001 - assert callable(sub._config.time_source) # noqa: SLF001 - - def test_subscriber_specification_name_lists_queues() -> None: metadata = MetaData() t = make_outbox_table(metadata) From ca31d2c9e84aa87f454c11fa6f6c482f4ae596e5 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 11:25:21 +0300 Subject: [PATCH 4/6] chore: lower release_stuck row-released log to INFO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A subscriber with one consistently slow handler hits release_stuck on every interval. Logging that as WARNING produced a steady stream of alert-shaped messages for an entirely benign condition. INFO is the right level — operators who care can rate-alert via their log aggregator. The error path stays at ERROR. --- faststream_outbox/subscriber/usecase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 4155d65..8219d81 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -207,7 +207,7 @@ async def _release_stuck_loop(self) -> None: else: if released: self._log( - log_level=logging.WARNING, + log_level=logging.INFO, message=f"release_stuck reset {released} stale rows back to pending", ) await anyio.sleep(interval) From 23a41732e9267ac4c6e5290de449a7519564440d Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 12:11:56 +0300 Subject: [PATCH 5/6] fix: parameterize advisory lock SQL The release_stuck advisory-lock query was built via f-string against outbox_table.name. Blast radius is small (the table name comes from developer config, not user input), but it's a free SQL injection surface that goes away with one bindparam. Lock-key value is unchanged (hashtext over the same string) so existing deployments are unaffected. Add a unit test that constructs an OutboxClient with a hostile-looking table name and asserts the literal does not appear in the compiled SQL, locking the parameterization in against future refactors. --- faststream_outbox/client.py | 5 +++-- tests/test_unit.py | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index b028b5f..5f52f4f 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -52,8 +52,9 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None: self._engine = engine self._table = outbox_table # Stable advisory-lock key derived from the table name; ``hashtext`` returns int4. - self._advisory_lock_sql = text( - f"SELECT pg_try_advisory_xact_lock(hashtext('faststream_outbox:{outbox_table.name}'))" + # Parameterized to keep the table name out of SQL string interpolation. + self._advisory_lock_sql = text("SELECT pg_try_advisory_xact_lock(hashtext(:lock_key))").bindparams( + lock_key=f"faststream_outbox:{outbox_table.name}" ) @property diff --git a/tests/test_unit.py b/tests/test_unit.py index 92477ba..5f6564c 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -15,6 +15,7 @@ OutboxState, make_outbox_table, ) +from faststream_outbox.client import OutboxClient from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage, OutboxMessage from faststream_outbox.parser.parser import OutboxParser @@ -525,6 +526,20 @@ async def test_client_fetch_empty_queues_returns_empty() -> None: assert await client.fetch([], limit=10) == [] +def test_advisory_lock_sql_parameterizes_table_name() -> None: + """Table name must be a bound parameter, never interpolated as SQL literal.""" + metadata = MetaData() + hostile = "x'); DROP TABLE x; --" + t = make_outbox_table(metadata, table_name=hostile) + client = OutboxClient(AsyncMock(), t) + + compiled = client._advisory_lock_sql.compile() # noqa: SLF001 + # Table name must NOT appear in the SQL string itself... + assert hostile not in str(compiled) + # ...but must appear as a bound param value. + assert any(hostile in str(v) for v in compiled.params.values()) + + # --- OutboxMessage.reject + assert_state_set logger branch --- From 14f19250e29bfd48e2565cc1a90ff54653436593 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 12:20:57 +0300 Subject: [PATCH 6/6] feat!: retry strategies return delay seconds; DB computes next_attempt_at Prior to this change, the retry strategy returned an absolute datetime computed from a Python-clock last_attempt_at, but the fetch query checked next_attempt_at <= func.now() against the DB clock. With drift between worker and DB hosts, retries fired at the wrong time. CLAUDE.md already called out the same asymmetry as fixed for release_stuck via make_interval; do the same here. RetryStrategyProto.get_next_attempt_at -> get_next_attempt_delay; return type changes from datetime | None to float | None (seconds). Strategy implementations no longer touch datetime arithmetic. The broker reads the delay off the row in _flush_retry and hands it to mark_pending_with_lease, which builds next_attempt_at server-side via now() + make_interval(secs => :delay). FakeOutboxClient mirrors the new shape with _utcnow() + timedelta. OutboxInnerMessage gets a pending_delay_seconds field that _nack populates and _flush_retry reads. Breaking change to RetryStrategyProto. v0, no compat shim. Integration test asserts next_attempt_at lands in [clock_timestamp() + delay, clock_timestamp() + delay] on either side of the call, locking in the server-side computation. --- faststream_outbox/client.py | 10 ++++--- faststream_outbox/message.py | 11 ++++--- faststream_outbox/retry.py | 37 +++++++++++------------ faststream_outbox/subscriber/usecase.py | 4 +-- faststream_outbox/testing.py | 4 +-- tests/test_fake.py | 14 ++++----- tests/test_integration.py | 33 +++++++++++++++++++-- tests/test_unit.py | 39 ++++++++++++------------- 8 files changed, 92 insertions(+), 60 deletions(-) diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index 5f52f4f..b3f5d85 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -114,7 +114,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 message_id: int, acquired_token: uuid.UUID, *, - next_attempt_at: _dt.datetime, + delay_seconds: float, attempts_count: int, first_attempt_at: _dt.datetime, last_attempt_at: _dt.datetime, @@ -122,15 +122,17 @@ async def mark_pending_with_lease( # noqa: PLR0913 """ Move *message_id* back to ``pending`` for retry, iff it still holds the lease. - Returns True if the row was updated. + ``next_attempt_at`` is computed server-side as ``now() + delay_seconds`` so + retry timing uses the DB clock, not the worker's. Returns True if the row was updated. """ t = self._table + next_attempt_at_expr = func.now() + func.make_interval(0, 0, 0, 0, 0, 0, bindparam("delay", type_=Float)) stmt = ( update(t) .where(t.c.id == message_id, t.c.acquired_token == acquired_token) .values( state=OutboxState.PENDING.value, - next_attempt_at=next_attempt_at, + next_attempt_at=next_attempt_at_expr, attempts_count=attempts_count, first_attempt_at=first_attempt_at, last_attempt_at=last_attempt_at, @@ -139,7 +141,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 ) ) async with self._engine.begin() as conn: - result = await conn.execute(stmt) + result = await conn.execute(stmt, {"delay": max(0.0, delay_seconds)}) return (result.rowcount or 0) > 0 async def release_stuck(self, *, timeout_seconds: float) -> int: diff --git a/faststream_outbox/message.py b/faststream_outbox/message.py index 3ff4cf1..1548cf7 100644 --- a/faststream_outbox/message.py +++ b/faststream_outbox/message.py @@ -59,6 +59,9 @@ class OutboxInnerMessage: state_set: bool = field(default=False, init=False) to_delete: bool = field(default=False, init=False) + # Set by ``_nack`` when the strategy schedules a retry; consumed by the + # subscriber's ``_flush_retry`` to drive ``mark_pending_with_lease``. + pending_delay_seconds: float | None = field(default=None, init=False) async def ack(self) -> None: await self._update_state_if_not_set(self._ack) @@ -81,8 +84,8 @@ async def _ack(self) -> None: async def _nack(self) -> None: self._record_attempt() - next_at = ( - self.retry_strategy.get_next_attempt_at( + delay = ( + self.retry_strategy.get_next_attempt_delay( first_attempt_at=self.first_attempt_at or self.last_attempt_at, # ty: ignore[invalid-argument-type] last_attempt_at=self.last_attempt_at, # ty: ignore[invalid-argument-type] attempts_count=self.attempts_count, @@ -91,10 +94,10 @@ async def _nack(self) -> None: if self.retry_strategy is not None else None ) - if next_at is None: + if delay is None: self.to_delete = True else: - self.next_attempt_at = next_at + self.pending_delay_seconds = delay async def _reject(self) -> None: self._record_attempt() diff --git a/faststream_outbox/retry.py b/faststream_outbox/retry.py index 8cd1595..f1b9974 100644 --- a/faststream_outbox/retry.py +++ b/faststream_outbox/retry.py @@ -1,27 +1,30 @@ import random from abc import ABC, abstractmethod from dataclasses import dataclass, field -from datetime import datetime, timedelta +from datetime import datetime from typing import Protocol class RetryStrategyProto(Protocol): """ - Decides whether a Nack'ed row gets another attempt and when. - - Implementations return ``None`` to signal terminal failure (the row will be - deleted). The current ``exception`` (if any) is passed through so users can - subclass to retry only on transient errors. + Decides whether a Nack'ed row gets another attempt and how long to wait. + + Implementations return the delay in seconds before the next attempt, or + ``None`` to signal terminal failure (the row will be deleted). The DB + computes the actual ``next_attempt_at`` from this delay using its own clock, + so retry timing is immune to skew between worker and DB hosts. The current + ``exception`` (if any) is passed through so users can subclass to retry only + on transient errors. """ - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int, exception: BaseException | None = None, - ) -> datetime | None: ... + ) -> float | None: ... @dataclass(kw_only=True) @@ -32,31 +35,29 @@ class _RetryStrategyTemplate(ABC, RetryStrategyProto): @abstractmethod def _delay_seconds(self, *, attempts_count: int) -> float: ... - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int, exception: BaseException | None = None, # noqa: ARG002 - ) -> datetime | None: + ) -> float | None: if self.max_attempts is not None and attempts_count >= self.max_attempts: return None delay = self._delay_seconds(attempts_count=attempts_count) - next_attempt_at = last_attempt_at + timedelta(seconds=delay) - if ( - self.max_total_delay_seconds is not None - and (next_attempt_at - first_attempt_at).total_seconds() > self.max_total_delay_seconds - ): - return None - return next_attempt_at + if self.max_total_delay_seconds is not None: + elapsed_so_far = (last_attempt_at - first_attempt_at).total_seconds() + if elapsed_so_far + delay > self.max_total_delay_seconds: + return None + return delay @dataclass(kw_only=True) class NoRetry(RetryStrategyProto): """No retry — first nack is terminal.""" - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: datetime, # noqa: ARG002 diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 8219d81..18fe6d2 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -180,12 +180,12 @@ async def _flush_terminal(self, row: OutboxInnerMessage) -> None: ) async def _flush_retry(self, row: OutboxInnerMessage) -> None: - if row.acquired_token is None: + if row.acquired_token is None or row.pending_delay_seconds is None: return updated = await self._client.mark_pending_with_lease( row.id, row.acquired_token, - next_attempt_at=row.next_attempt_at, + delay_seconds=row.pending_delay_seconds, attempts_count=row.attempts_count, first_attempt_at=row.first_attempt_at, # ty: ignore[invalid-argument-type] last_attempt_at=row.last_attempt_at, # ty: ignore[invalid-argument-type] diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 030f374..e91d8e4 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -115,7 +115,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 message_id: int, acquired_token: uuid.UUID, *, - next_attempt_at: _dt.datetime, + delay_seconds: float, attempts_count: int, first_attempt_at: _dt.datetime, last_attempt_at: _dt.datetime, @@ -123,7 +123,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 for row in self._rows: if row.id == message_id and row.acquired_token == acquired_token: row.state = OutboxState.PENDING.value - row.next_attempt_at = next_attempt_at + row.next_attempt_at = _utcnow() + _dt.timedelta(seconds=max(0.0, delay_seconds)) row.attempts_count = attempts_count row.first_attempt_at = first_attempt_at row.last_attempt_at = last_attempt_at diff --git a/tests/test_fake.py b/tests/test_fake.py index 0cb21ee..fbbad0c 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -553,18 +553,18 @@ async def test_fake_connect_is_noop() -> None: async def test_retry_strategy_receives_handler_exception() -> None: - """RetryStrategyProto.get_next_attempt_at must see the raised exception, not None.""" + """RetryStrategyProto.get_next_attempt_delay must see the raised exception, not None.""" seen_exceptions: list[BaseException | None] = [] class RecordingStrategy(RetryStrategyProto): - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: _dt.datetime, # noqa: ARG002 last_attempt_at: _dt.datetime, # noqa: ARG002 attempts_count: int, # noqa: ARG002 exception: BaseException | None = None, - ) -> _dt.datetime | None: + ) -> float | None: seen_exceptions.append(exception) return None # terminal so the test wraps up promptly @@ -598,17 +598,17 @@ async def test_retry_strategy_can_branch_on_exception_type() -> None: attempts: list[str] = [] class TransientOnlyStrategy(RetryStrategyProto): - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: _dt.datetime, # noqa: ARG002 - last_attempt_at: _dt.datetime, + last_attempt_at: _dt.datetime, # noqa: ARG002 attempts_count: int, # noqa: ARG002 exception: BaseException | None = None, - ) -> _dt.datetime | None: + ) -> float | None: if isinstance(exception, ValueError): return None # permanent → terminal - return last_attempt_at + _dt.timedelta(seconds=0.05) # transient → retry + return 0.05 # transient → retry broker = _make_broker() diff --git a/tests/test_integration.py b/tests/test_integration.py index c9e7556..2bdd1f6 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -5,7 +5,7 @@ import uuid import pytest -from sqlalchemy import insert, select +from sqlalchemy import insert, select, text from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker from faststream_outbox import ( @@ -121,11 +121,10 @@ async def test_mark_pending_with_lease(pg_engine, outbox_table) -> None: client = OutboxClient(pg_engine, outbox_table) rows = await client.fetch(["orders"], limit=1) msg = rows[0] - future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=10) updated = await client.mark_pending_with_lease( msg.id, msg.acquired_token, # ty: ignore[invalid-argument-type] - next_attempt_at=future, + delay_seconds=600.0, # 10 minutes in the future attempts_count=1, first_attempt_at=_dt.datetime.now(tz=_dt.UTC), last_attempt_at=_dt.datetime.now(tz=_dt.UTC), @@ -136,6 +135,34 @@ async def test_mark_pending_with_lease(pg_engine, outbox_table) -> None: assert rows2 == [] +async def test_mark_pending_with_lease_uses_db_clock(pg_engine, outbox_table) -> None: + """next_attempt_at must be computed server-side as now() + delay, not from the worker's clock.""" + async with pg_engine.begin() as conn: + await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) + client = OutboxClient(pg_engine, outbox_table) + rows = await client.fetch(["orders"], limit=1) + msg = rows[0] + delay = 10.0 + # Use clock_timestamp(), not now(): now() returns transaction start time and + # would freeze inside the outer connection. + async with pg_engine.connect() as conn: + db_before = (await conn.execute(text("SELECT clock_timestamp()"))).scalar() + await client.mark_pending_with_lease( + msg.id, + msg.acquired_token, # ty: ignore[invalid-argument-type] + delay_seconds=delay, + attempts_count=1, + first_attempt_at=_dt.datetime.now(tz=_dt.UTC), + last_attempt_at=_dt.datetime.now(tz=_dt.UTC), + ) + async with pg_engine.connect() as conn: + db_after = (await conn.execute(text("SELECT clock_timestamp()"))).scalar() + next_at = (await conn.execute(select(outbox_table.c.next_attempt_at))).scalar_one() + # next_attempt_at was set inside the mark_pending_with_lease transaction whose + # now() falls between db_before and db_after. + assert db_before + _dt.timedelta(seconds=delay) <= next_at <= db_after + _dt.timedelta(seconds=delay) + + async def test_release_stuck_recovers_old_processing_rows(pg_engine, outbox_table) -> None: async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) diff --git a/tests/test_unit.py b/tests/test_unit.py index 5f6564c..5428e98 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -102,7 +102,7 @@ def _make_times() -> tuple[_dt.datetime, _dt.datetime]: def test_no_retry_always_terminal() -> None: first, last = _make_times() assert ( - NoRetry().get_next_attempt_at( + NoRetry().get_next_attempt_delay( first_attempt_at=first, last_attempt_at=last, attempts_count=1, @@ -111,53 +111,52 @@ def test_no_retry_always_terminal() -> None: ) -def test_constant_retry_returns_last_plus_delay() -> None: +def test_constant_retry_returns_delay() -> None: first, last = _make_times() - next_at = ConstantRetry(delay_seconds=30).get_next_attempt_at( + delay = ConstantRetry(delay_seconds=30).get_next_attempt_delay( first_attempt_at=first, last_attempt_at=last, attempts_count=1, ) - assert next_at == last + _dt.timedelta(seconds=30) + assert delay == 30.0 def test_constant_retry_max_attempts_reached() -> None: first, last = _make_times() s = ConstantRetry(delay_seconds=1, max_attempts=3) - assert s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=3) is None - assert s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=2) is not None + assert s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=3) is None + assert s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=2) is not None def test_constant_retry_max_total_delay_exceeded() -> None: first, last = _make_times() s = ConstantRetry(delay_seconds=100, max_total_delay_seconds=50) - assert s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=1) is None + assert s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=1) is None def test_linear_retry_grows_with_attempts() -> None: first, last = _make_times() s = LinearRetry(initial_delay_seconds=10, step_seconds=5) - n1 = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=1) - n2 = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=3) - assert n1 is not None - assert n2 is not None - assert n2 > n1 + d1 = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=1) + d2 = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=3) + assert d1 is not None + assert d2 is not None + assert d2 > d1 def test_exponential_retry_caps_at_max_delay() -> None: first, last = _make_times() s = ExponentialRetry(initial_delay_seconds=1, multiplier=2, max_delay_seconds=10) - next_at = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=10) - assert next_at == last + _dt.timedelta(seconds=10) + delay = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=10) + assert delay == 10.0 def test_exponential_retry_with_jitter_within_bounds() -> None: first, last = _make_times() s = ExponentialRetry(initial_delay_seconds=10, multiplier=1.0, jitter_factor=0.5) - next_at = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=1) - assert next_at is not None - delta = (next_at - last).total_seconds() - assert 10.0 <= delta <= 15.0 + delay = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=1) + assert delay is not None + assert 10.0 <= delay <= 15.0 # --- OutboxInnerMessage state machine --- @@ -201,7 +200,7 @@ async def test_inner_message_nack_with_strategy_schedules_retry() -> None: await msg.nack() assert not msg.to_delete assert msg.last_attempt_at is not None - assert msg.next_attempt_at > msg.last_attempt_at + assert msg.pending_delay_seconds == 60.0 async def test_inner_message_reject_is_terminal() -> None: @@ -647,7 +646,7 @@ async def test_fake_client_mark_pending_miss() -> None: updated = await client.mark_pending_with_lease( 999, uuid.uuid4(), - next_attempt_at=now, + delay_seconds=0.0, attempts_count=1, first_attempt_at=now, last_attempt_at=now,