diff --git a/CLAUDE.md b/CLAUDE.md index 649040e..b06cc41 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -59,6 +59,7 @@ The caller owns the `AsyncEngine`. `OutboxBrokerConfig.disconnect()` deliberatel ## Conventions -- Python 3.13+. Global imports (no inline imports) per user's global rules. +- Python 3.13+. +- **Never use local/inline imports.** All imports go at the top of the module — no `import` statements inside functions, methods, or `if TYPE_CHECKING` exception aside. This applies to test files too. If a `# noqa: PLC0415` is the only way to keep an import inline, hoist it instead. - `ruff` is set to `select = ["ALL"]` with a documented ignore list in `pyproject.toml`; many `# noqa: XXX` comments are intentional and align with that list. - Type checker is `ty`. Use `# ty: ignore[]` for intentional escapes (matches existing usage in `broker.py`, `registrator.py`). diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index 12e63e4..df6700b 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -9,6 +9,7 @@ import logging import typing from collections.abc import Iterable, Sequence +from types import TracebackType from faststream import BaseMiddleware from faststream._internal.basic_types import LoggerProto @@ -40,6 +41,29 @@ from faststream_outbox.subscriber.usecase import OutboxSubscriber +class _CaptureExceptionMiddleware(BaseMiddleware): + """ + Stash the handler exception on the inner row before AckMiddleware nacks. + + FastStream's AcknowledgementMiddleware catches the handler exception in its + own ``__aexit__`` and calls ``message.nack()`` directly — the exception never + propagates back to the worker loop. Without this middleware, ``OutboxInnerMessage._nack`` + sees ``last_exception=None`` and retry strategies that branch on exception type + can't work. We sit one step closer to the handler in the middleware stack so + our ``__aexit__`` runs before AckMiddleware's, capturing ``exc_val`` onto the row. + """ + + async def __aexit__( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: TracebackType | None = None, + ) -> bool | None: + if exc_val is not None and isinstance(self.msg, OutboxInnerMessage): + self.msg.last_exception = exc_val + return False + + class OutboxParamsStorage(DefaultLoggerStorage): _max_msg_id_ln = -1 _max_queue_name = 7 @@ -98,9 +122,8 @@ def __init__( # noqa: PLR0913 fd_config = FastDependsConfig(use_fastdepends=apply_types) broker_config = OutboxBrokerConfig( engine_state=engine_state, - outbox_table=outbox_table, client=client, - broker_middlewares=middlewares, + broker_middlewares=(_CaptureExceptionMiddleware, *middlewares), broker_parser=parser, broker_decoder=decoder, logger=make_logger_state( diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index b028b5f..b3f5d85 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -52,8 +52,9 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None: self._engine = engine self._table = outbox_table # Stable advisory-lock key derived from the table name; ``hashtext`` returns int4. - self._advisory_lock_sql = text( - f"SELECT pg_try_advisory_xact_lock(hashtext('faststream_outbox:{outbox_table.name}'))" + # Parameterized to keep the table name out of SQL string interpolation. + self._advisory_lock_sql = text("SELECT pg_try_advisory_xact_lock(hashtext(:lock_key))").bindparams( + lock_key=f"faststream_outbox:{outbox_table.name}" ) @property @@ -113,7 +114,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 message_id: int, acquired_token: uuid.UUID, *, - next_attempt_at: _dt.datetime, + delay_seconds: float, attempts_count: int, first_attempt_at: _dt.datetime, last_attempt_at: _dt.datetime, @@ -121,15 +122,17 @@ async def mark_pending_with_lease( # noqa: PLR0913 """ Move *message_id* back to ``pending`` for retry, iff it still holds the lease. - Returns True if the row was updated. + ``next_attempt_at`` is computed server-side as ``now() + delay_seconds`` so + retry timing uses the DB clock, not the worker's. Returns True if the row was updated. """ t = self._table + next_attempt_at_expr = func.now() + func.make_interval(0, 0, 0, 0, 0, 0, bindparam("delay", type_=Float)) stmt = ( update(t) .where(t.c.id == message_id, t.c.acquired_token == acquired_token) .values( state=OutboxState.PENDING.value, - next_attempt_at=next_attempt_at, + next_attempt_at=next_attempt_at_expr, attempts_count=attempts_count, first_attempt_at=first_attempt_at, last_attempt_at=last_attempt_at, @@ -138,7 +141,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 ) ) async with self._engine.begin() as conn: - result = await conn.execute(stmt) + result = await conn.execute(stmt, {"delay": max(0.0, delay_seconds)}) return (result.rowcount or 0) > 0 async def release_stuck(self, *, timeout_seconds: float) -> int: diff --git a/faststream_outbox/configs.py b/faststream_outbox/configs.py index 52a7ca6..bd4df0e 100644 --- a/faststream_outbox/configs.py +++ b/faststream_outbox/configs.py @@ -6,9 +6,7 @@ client and to subscribers. """ -import datetime as _dt import typing -from collections.abc import Callable from dataclasses import dataclass, field from faststream._internal.configs import BrokerConfig @@ -16,16 +14,11 @@ if typing.TYPE_CHECKING: - from sqlalchemy import Table from sqlalchemy.ext.asyncio import AsyncEngine from faststream_outbox.client import OutboxClient -def _utcnow() -> _dt.datetime: - return _dt.datetime.now(tz=_dt.UTC) - - class EngineState: """ Lazy holder so the broker can be constructed before the engine is wired up. @@ -51,9 +44,7 @@ def set_engine(self, engine: "AsyncEngine") -> None: @dataclass(kw_only=True) class OutboxBrokerConfig(BrokerConfig): engine_state: EngineState = field(default_factory=EngineState) - outbox_table: "Table | None" = None client: "OutboxClient | None" = None - time_source: Callable[[], _dt.datetime] = _utcnow async def connect(self) -> None: # Engine and client are wired up by the broker's constructor; nothing to do here. diff --git a/faststream_outbox/message.py b/faststream_outbox/message.py index 3ff4cf1..1548cf7 100644 --- a/faststream_outbox/message.py +++ b/faststream_outbox/message.py @@ -59,6 +59,9 @@ class OutboxInnerMessage: state_set: bool = field(default=False, init=False) to_delete: bool = field(default=False, init=False) + # Set by ``_nack`` when the strategy schedules a retry; consumed by the + # subscriber's ``_flush_retry`` to drive ``mark_pending_with_lease``. + pending_delay_seconds: float | None = field(default=None, init=False) async def ack(self) -> None: await self._update_state_if_not_set(self._ack) @@ -81,8 +84,8 @@ async def _ack(self) -> None: async def _nack(self) -> None: self._record_attempt() - next_at = ( - self.retry_strategy.get_next_attempt_at( + delay = ( + self.retry_strategy.get_next_attempt_delay( first_attempt_at=self.first_attempt_at or self.last_attempt_at, # ty: ignore[invalid-argument-type] last_attempt_at=self.last_attempt_at, # ty: ignore[invalid-argument-type] attempts_count=self.attempts_count, @@ -91,10 +94,10 @@ async def _nack(self) -> None: if self.retry_strategy is not None else None ) - if next_at is None: + if delay is None: self.to_delete = True else: - self.next_attempt_at = next_at + self.pending_delay_seconds = delay async def _reject(self) -> None: self._record_attempt() diff --git a/faststream_outbox/retry.py b/faststream_outbox/retry.py index 8cd1595..f1b9974 100644 --- a/faststream_outbox/retry.py +++ b/faststream_outbox/retry.py @@ -1,27 +1,30 @@ import random from abc import ABC, abstractmethod from dataclasses import dataclass, field -from datetime import datetime, timedelta +from datetime import datetime from typing import Protocol class RetryStrategyProto(Protocol): """ - Decides whether a Nack'ed row gets another attempt and when. - - Implementations return ``None`` to signal terminal failure (the row will be - deleted). The current ``exception`` (if any) is passed through so users can - subclass to retry only on transient errors. + Decides whether a Nack'ed row gets another attempt and how long to wait. + + Implementations return the delay in seconds before the next attempt, or + ``None`` to signal terminal failure (the row will be deleted). The DB + computes the actual ``next_attempt_at`` from this delay using its own clock, + so retry timing is immune to skew between worker and DB hosts. The current + ``exception`` (if any) is passed through so users can subclass to retry only + on transient errors. """ - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int, exception: BaseException | None = None, - ) -> datetime | None: ... + ) -> float | None: ... @dataclass(kw_only=True) @@ -32,31 +35,29 @@ class _RetryStrategyTemplate(ABC, RetryStrategyProto): @abstractmethod def _delay_seconds(self, *, attempts_count: int) -> float: ... - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int, exception: BaseException | None = None, # noqa: ARG002 - ) -> datetime | None: + ) -> float | None: if self.max_attempts is not None and attempts_count >= self.max_attempts: return None delay = self._delay_seconds(attempts_count=attempts_count) - next_attempt_at = last_attempt_at + timedelta(seconds=delay) - if ( - self.max_total_delay_seconds is not None - and (next_attempt_at - first_attempt_at).total_seconds() > self.max_total_delay_seconds - ): - return None - return next_attempt_at + if self.max_total_delay_seconds is not None: + elapsed_so_far = (last_attempt_at - first_attempt_at).total_seconds() + if elapsed_so_far + delay > self.max_total_delay_seconds: + return None + return delay @dataclass(kw_only=True) class NoRetry(RetryStrategyProto): """No retry — first nack is terminal.""" - def get_next_attempt_at( + def get_next_attempt_delay( self, *, first_attempt_at: datetime, # noqa: ARG002 diff --git a/faststream_outbox/router.py b/faststream_outbox/router.py index f4f5bb9..55ef3c5 100644 --- a/faststream_outbox/router.py +++ b/faststream_outbox/router.py @@ -62,11 +62,17 @@ def __init__( # noqa: PLR0913 class OutboxRouter(OutboxRegistrator, BrokerRouter[OutboxInnerMessage, BrokerConfig]): - """Includable router for ``OutboxBroker``.""" + """ + Includable router for ``OutboxBroker``. + + Use it to register subscribers in a separate module and attach them to the + broker via ``broker.include_router(router)``. There is no ``prefix`` knob: + queues are routed by their literal name, so producers and consumers must + agree on the exact string. If you want namespacing, put it in the queue name. + """ def __init__( # noqa: PLR0913 self, - prefix: str = "", handlers: Iterable[OutboxRoute] = (), *, dependencies: Iterable["Dependant"] = (), @@ -83,7 +89,6 @@ def __init__( # noqa: PLR0913 broker_parser=parser, broker_decoder=decoder, include_in_schema=include_in_schema, - prefix=prefix, ), handlers=handlers, # ty: ignore[unknown-argument] routers=routers, diff --git a/faststream_outbox/subscriber/config.py b/faststream_outbox/subscriber/config.py index d072b4c..c8564a7 100644 --- a/faststream_outbox/subscriber/config.py +++ b/faststream_outbox/subscriber/config.py @@ -1,6 +1,4 @@ -import datetime as _dt import typing -from collections.abc import Callable from dataclasses import dataclass from faststream._internal.configs import SubscriberSpecificationConfig, SubscriberUsecaseConfig @@ -26,15 +24,6 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig): release_stuck_interval: float max_deliveries: int | None - @property - def full_queues(self) -> list[str]: - prefix = self._outer_config.prefix or "" - return [f"{prefix}{q}" for q in self.queues] - - @property - def time_source(self) -> Callable[[], _dt.datetime]: - return self._outer_config.time_source - @property def ack_policy(self) -> AckPolicy: if self._ack_policy is EMPTY: diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index c0a37f4..18fe6d2 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -46,9 +46,8 @@ class OutboxSubscriberSpecification(SubscriberSpecification["OutboxBrokerConfig", OutboxSubscriberSpecificationConfig]): @property def name(self) -> str: - prefix = getattr(self._outer_config, "prefix", "") joined = ",".join(self.config.queues) - return f"{prefix}{joined}:{self.call_name}" + return f"{joined}:{self.call_name}" def get_schema(self) -> dict[str, SubscriberSpec]: return { @@ -94,7 +93,7 @@ def _client(self) -> "OutboxClient": @property def _queues(self) -> list[str]: - return self._config.full_queues + return self._config.queues @typing.override async def start(self) -> None: @@ -151,13 +150,11 @@ async def _worker_loop(self) -> None: if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger): await self._flush_terminal(row) continue - # FastStream's AckPolicy middleware catches handler exceptions before - # they reach this except, so this branch is defensive. + # AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware + # stashes exc onto row.last_exception before nack runs, so retry strategies + # can branch on exception type. try: await self.consume(row) - except BaseException as exc: # pragma: no cover - row.last_exception = exc - raise finally: await row.assert_state_set(logger) await self._flush_result(row) @@ -183,12 +180,12 @@ async def _flush_terminal(self, row: OutboxInnerMessage) -> None: ) async def _flush_retry(self, row: OutboxInnerMessage) -> None: - if row.acquired_token is None: + if row.acquired_token is None or row.pending_delay_seconds is None: return updated = await self._client.mark_pending_with_lease( row.id, row.acquired_token, - next_attempt_at=row.next_attempt_at, + delay_seconds=row.pending_delay_seconds, attempts_count=row.attempts_count, first_attempt_at=row.first_attempt_at, # ty: ignore[invalid-argument-type] last_attempt_at=row.last_attempt_at, # ty: ignore[invalid-argument-type] @@ -210,7 +207,7 @@ async def _release_stuck_loop(self) -> None: else: if released: self._log( - log_level=logging.WARNING, + log_level=logging.INFO, message=f"release_stuck reset {released} stale rows back to pending", ) await anyio.sleep(interval) diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 030f374..e91d8e4 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -115,7 +115,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 message_id: int, acquired_token: uuid.UUID, *, - next_attempt_at: _dt.datetime, + delay_seconds: float, attempts_count: int, first_attempt_at: _dt.datetime, last_attempt_at: _dt.datetime, @@ -123,7 +123,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 for row in self._rows: if row.id == message_id and row.acquired_token == acquired_token: row.state = OutboxState.PENDING.value - row.next_attempt_at = next_attempt_at + row.next_attempt_at = _utcnow() + _dt.timedelta(seconds=max(0.0, delay_seconds)) row.attempts_count = attempts_count row.first_attempt_at = first_attempt_at row.last_attempt_at = last_attempt_at diff --git a/tests/test_fake.py b/tests/test_fake.py index 2a27855..fbbad0c 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -8,6 +8,8 @@ from faststream_outbox import ( ConstantRetry, OutboxBroker, + OutboxRouter, + RetryStrategyProto, TestOutboxBroker, make_outbox_table, ) @@ -548,3 +550,110 @@ async def test_fake_connect_is_noop() -> None: test_broker = TestOutboxBroker(broker) # Direct call exercises L226 even though it's also called during __aenter__. await test_broker._fake_connect(broker) # noqa: SLF001 + + +async def test_retry_strategy_receives_handler_exception() -> None: + """RetryStrategyProto.get_next_attempt_delay must see the raised exception, not None.""" + seen_exceptions: list[BaseException | None] = [] + + class RecordingStrategy(RetryStrategyProto): + def get_next_attempt_delay( + self, + *, + first_attempt_at: _dt.datetime, # noqa: ARG002 + last_attempt_at: _dt.datetime, # noqa: ARG002 + attempts_count: int, # noqa: ARG002 + exception: BaseException | None = None, + ) -> float | None: + seen_exceptions.append(exception) + return None # terminal so the test wraps up promptly + + broker = _make_broker() + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + retry_strategy=RecordingStrategy(), + ) + async def handle(body: str) -> None: + del body + msg = "boom-transient" + raise RuntimeError(msg) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + p, h = encode_payload("payload") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: seen_exceptions, timeout=3.0) + + assert len(seen_exceptions) >= 1 + exc = seen_exceptions[0] + assert isinstance(exc, RuntimeError) + assert str(exc) == "boom-transient" + + +async def test_retry_strategy_can_branch_on_exception_type() -> None: + """Subclass pattern from retry.py docstring: retry transient, terminate on permanent.""" + attempts: list[str] = [] + + class TransientOnlyStrategy(RetryStrategyProto): + def get_next_attempt_delay( + self, + *, + first_attempt_at: _dt.datetime, # noqa: ARG002 + last_attempt_at: _dt.datetime, # noqa: ARG002 + attempts_count: int, # noqa: ARG002 + exception: BaseException | None = None, + ) -> float | None: + if isinstance(exception, ValueError): + return None # permanent → terminal + return 0.05 # transient → retry + + broker = _make_broker() + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + retry_strategy=TransientOnlyStrategy(), + ) + async def handle(body: str) -> None: + attempts.append(body) + # First call: transient (gets retried via the strategy's retry branch). + # Second call: permanent (terminates via the strategy's None branch). + if len(attempts) == 1: + msg = "transient" + raise RuntimeError(msg) + msg = "permanent" + raise ValueError(msg) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + p, h = encode_payload("body") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: not test_broker.fake_client.rows, timeout=5.0) + + assert len(attempts) == 2 # transient retried once, then permanent terminated + + +async def test_router_subscriber_receives_plain_queue_publish() -> None: + """A subscriber registered via OutboxRouter must receive rows whose queue matches literally.""" + received: list[str] = [] + + router = OutboxRouter() + + @router.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + broker = _make_broker() + broker.include_router(router) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + p, h = encode_payload("via-router") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: received, timeout=3.0) + + assert received == ["via-router"] diff --git a/tests/test_integration.py b/tests/test_integration.py index c9e7556..2bdd1f6 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -5,7 +5,7 @@ import uuid import pytest -from sqlalchemy import insert, select +from sqlalchemy import insert, select, text from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker from faststream_outbox import ( @@ -121,11 +121,10 @@ async def test_mark_pending_with_lease(pg_engine, outbox_table) -> None: client = OutboxClient(pg_engine, outbox_table) rows = await client.fetch(["orders"], limit=1) msg = rows[0] - future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=10) updated = await client.mark_pending_with_lease( msg.id, msg.acquired_token, # ty: ignore[invalid-argument-type] - next_attempt_at=future, + delay_seconds=600.0, # 10 minutes in the future attempts_count=1, first_attempt_at=_dt.datetime.now(tz=_dt.UTC), last_attempt_at=_dt.datetime.now(tz=_dt.UTC), @@ -136,6 +135,34 @@ async def test_mark_pending_with_lease(pg_engine, outbox_table) -> None: assert rows2 == [] +async def test_mark_pending_with_lease_uses_db_clock(pg_engine, outbox_table) -> None: + """next_attempt_at must be computed server-side as now() + delay, not from the worker's clock.""" + async with pg_engine.begin() as conn: + await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) + client = OutboxClient(pg_engine, outbox_table) + rows = await client.fetch(["orders"], limit=1) + msg = rows[0] + delay = 10.0 + # Use clock_timestamp(), not now(): now() returns transaction start time and + # would freeze inside the outer connection. + async with pg_engine.connect() as conn: + db_before = (await conn.execute(text("SELECT clock_timestamp()"))).scalar() + await client.mark_pending_with_lease( + msg.id, + msg.acquired_token, # ty: ignore[invalid-argument-type] + delay_seconds=delay, + attempts_count=1, + first_attempt_at=_dt.datetime.now(tz=_dt.UTC), + last_attempt_at=_dt.datetime.now(tz=_dt.UTC), + ) + async with pg_engine.connect() as conn: + db_after = (await conn.execute(text("SELECT clock_timestamp()"))).scalar() + next_at = (await conn.execute(select(outbox_table.c.next_attempt_at))).scalar_one() + # next_attempt_at was set inside the mark_pending_with_lease transaction whose + # now() falls between db_before and db_after. + assert db_before + _dt.timedelta(seconds=delay) <= next_at <= db_after + _dt.timedelta(seconds=delay) + + async def test_release_stuck_recovers_old_processing_rows(pg_engine, outbox_table) -> None: async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) diff --git a/tests/test_unit.py b/tests/test_unit.py index 4e2558e..5428e98 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -15,6 +15,7 @@ OutboxState, make_outbox_table, ) +from faststream_outbox.client import OutboxClient from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage, OutboxMessage from faststream_outbox.parser.parser import OutboxParser @@ -101,7 +102,7 @@ def _make_times() -> tuple[_dt.datetime, _dt.datetime]: def test_no_retry_always_terminal() -> None: first, last = _make_times() assert ( - NoRetry().get_next_attempt_at( + NoRetry().get_next_attempt_delay( first_attempt_at=first, last_attempt_at=last, attempts_count=1, @@ -110,53 +111,52 @@ def test_no_retry_always_terminal() -> None: ) -def test_constant_retry_returns_last_plus_delay() -> None: +def test_constant_retry_returns_delay() -> None: first, last = _make_times() - next_at = ConstantRetry(delay_seconds=30).get_next_attempt_at( + delay = ConstantRetry(delay_seconds=30).get_next_attempt_delay( first_attempt_at=first, last_attempt_at=last, attempts_count=1, ) - assert next_at == last + _dt.timedelta(seconds=30) + assert delay == 30.0 def test_constant_retry_max_attempts_reached() -> None: first, last = _make_times() s = ConstantRetry(delay_seconds=1, max_attempts=3) - assert s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=3) is None - assert s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=2) is not None + assert s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=3) is None + assert s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=2) is not None def test_constant_retry_max_total_delay_exceeded() -> None: first, last = _make_times() s = ConstantRetry(delay_seconds=100, max_total_delay_seconds=50) - assert s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=1) is None + assert s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=1) is None def test_linear_retry_grows_with_attempts() -> None: first, last = _make_times() s = LinearRetry(initial_delay_seconds=10, step_seconds=5) - n1 = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=1) - n2 = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=3) - assert n1 is not None - assert n2 is not None - assert n2 > n1 + d1 = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=1) + d2 = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=3) + assert d1 is not None + assert d2 is not None + assert d2 > d1 def test_exponential_retry_caps_at_max_delay() -> None: first, last = _make_times() s = ExponentialRetry(initial_delay_seconds=1, multiplier=2, max_delay_seconds=10) - next_at = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=10) - assert next_at == last + _dt.timedelta(seconds=10) + delay = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=10) + assert delay == 10.0 def test_exponential_retry_with_jitter_within_bounds() -> None: first, last = _make_times() s = ExponentialRetry(initial_delay_seconds=10, multiplier=1.0, jitter_factor=0.5) - next_at = s.get_next_attempt_at(first_attempt_at=first, last_attempt_at=last, attempts_count=1) - assert next_at is not None - delta = (next_at - last).total_seconds() - assert 10.0 <= delta <= 15.0 + delay = s.get_next_attempt_delay(first_attempt_at=first, last_attempt_at=last, attempts_count=1) + assert delay is not None + assert 10.0 <= delay <= 15.0 # --- OutboxInnerMessage state machine --- @@ -200,7 +200,7 @@ async def test_inner_message_nack_with_strategy_schedules_retry() -> None: await msg.nack() assert not msg.to_delete assert msg.last_attempt_at is not None - assert msg.next_attempt_at > msg.last_attempt_at + assert msg.pending_delay_seconds == 60.0 async def test_inner_message_reject_is_terminal() -> None: @@ -348,7 +348,7 @@ async def second(body: dict) -> None: ... def test_router_can_be_constructed() -> None: - router = OutboxRouter(prefix="svc-") + router = OutboxRouter() assert router is not None @@ -504,14 +504,6 @@ def test_outbox_router_config_engine_state_raises() -> None: _ = cfg.engine_state -def test_outbox_broker_config_uses_default_time_source() -> None: - from faststream_outbox.configs import OutboxBrokerConfig # noqa: PLC0415 - - cfg = OutboxBrokerConfig() - now = cfg.time_source() - assert now.tzinfo is not None # naive datetimes would be a regression - - # --- client --- @@ -533,6 +525,20 @@ async def test_client_fetch_empty_queues_returns_empty() -> None: assert await client.fetch([], limit=10) == [] +def test_advisory_lock_sql_parameterizes_table_name() -> None: + """Table name must be a bound parameter, never interpolated as SQL literal.""" + metadata = MetaData() + hostile = "x'); DROP TABLE x; --" + t = make_outbox_table(metadata, table_name=hostile) + client = OutboxClient(AsyncMock(), t) + + compiled = client._advisory_lock_sql.compile() # noqa: SLF001 + # Table name must NOT appear in the SQL string itself... + assert hostile not in str(compiled) + # ...but must appear as a bound param value. + assert any(hostile in str(v) for v in compiled.params.values()) + + # --- OutboxMessage.reject + assert_state_set logger branch --- @@ -573,19 +579,7 @@ async def handler(body: str) -> None: ... assert route is not None -def test_subscriber_config_time_source_property() -> None: - metadata = MetaData() - t = make_outbox_table(metadata) - broker = OutboxBroker(outbox_table=t) - - @broker.subscriber("orders") - async def handle(body: dict) -> None: ... - - sub = next(iter(broker._subscribers)) # noqa: SLF001 - assert callable(sub._config.time_source) # noqa: SLF001 - - -def test_subscriber_specification_name_with_prefix() -> None: +def test_subscriber_specification_name_lists_queues() -> None: metadata = MetaData() t = make_outbox_table(metadata) broker = OutboxBroker(outbox_table=t) @@ -652,7 +646,7 @@ async def test_fake_client_mark_pending_miss() -> None: updated = await client.mark_pending_with_lease( 999, uuid.uuid4(), - next_attempt_at=now, + delay_seconds=0.0, attempts_count=1, first_attempt_at=now, last_attempt_at=now,