diff --git a/CLAUDE.md b/CLAUDE.md index b06cc41..afc6767 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,24 +30,23 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr ### User-owned schema -`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. `OutboxState` is `PENDING` / `PROCESSING` only; terminal failures `DELETE` (no archive, no DLQ). +`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ). -### Three-loop subscriber (`subscriber/usecase.py`) +### Two-loop subscriber (`subscriber/usecase.py`) Per subscriber: -1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE state='processing', acquired_token=:uuid RETURNING *`. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors. +1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors. 2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`. -3. **`_release_stuck_loop`** — flips `processing` rows older than `release_stuck_timeout` back to `pending`, wrapped in a `pg_try_advisory_xact_lock` keyed off the table name so multiple processes don't fight. ### Lease-token invariant — load-bearing -Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and `release_stuck` re-claimed the row, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this. +Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and a newer fetch reclaimed the row with a fresh token, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this. -`release_stuck` computes its cutoff server-side via `make_interval(secs => :timeout)` to be immune to worker/DB clock skew. +`lease_ttl_seconds` (default `60.0`, per-subscriber) **must exceed the P99 handler duration with margin**, otherwise healthy in-flight handlers race their own lease expiry and trigger duplicate deliveries. The lease cutoff is computed server-side via `make_interval(secs => :lease_ttl)` to be immune to worker/DB clock skew. ### Test broker -`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts) but runs the **real** `OutboxSubscriber` loops — fetch / worker / release-stuck — so tests exercise the actual delivery path. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`). +`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts) but runs the **real** `OutboxSubscriber` loops — fetch / worker — so tests exercise the actual delivery path. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`). ### Engine ownership diff --git a/README.md b/README.md index 2fee0a9..98896e9 100644 --- a/README.md +++ b/README.md @@ -40,21 +40,22 @@ async with session_factory() as session, session.begin(): `broker.publish_batch(*bodies, queue, session, headers=None)` inserts many rows in a single round-trip with the same transactional contract. -A subscriber owns three async loops: +A subscriber owns two async loops: -1. **fetch** — claims due rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE state='processing', acquired_token=:uuid RETURNING *` in a single CTE. +1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. 2. **workers** (× `max_workers`) — dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`. On failure, the retry strategy decides: schedule another attempt, or terminal `DELETE`. -3. **release-stuck** — periodically flips `processing` rows back to `pending` if their lease is older than `release_stuck_timeout`. Wrapped in a Postgres advisory lock so multiple processes don't compete. The `acquired_token` is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal `DELETE`/`UPDATE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row. +`lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery. + ## Recommended index Add this to your Alembic migration alongside the table: ```sql CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at) - WHERE state = 'pending'; + WHERE acquired_token IS NULL; ``` ## Schema validation @@ -111,8 +112,7 @@ Per-subscriber knobs (passed to `@broker.subscriber("…", …)`): - `max_workers` (default `1`) — concurrent handlers per subscriber. - `fetch_batch_size` (default `10`) — rows claimed per fetch cycle. - `min_fetch_interval` / `max_fetch_interval` (default `1.0` / `10.0` s) — base + ceiling for the adaptive idle backoff with jitter. -- `release_stuck_timeout` (default `300.0` s) — how long a `processing` row may live before being released back to `pending`. -- `release_stuck_interval` (default `release_stuck_timeout / 2`). -- `max_deliveries` (default `None` — unbounded) — total claims (including stuck-recovery re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge. +- `lease_ttl_seconds` (default `60.0` s) — how long a claim is valid before another fetch may reclaim it. **Must exceed your handler's P99 duration with margin.** +- `max_deliveries` (default `None` — unbounded) — total claims (including lease-expiry re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge. ## 📝 [License](LICENSE) diff --git a/faststream_outbox/__init__.py b/faststream_outbox/__init__.py index c9b22fb..6bec73a 100644 --- a/faststream_outbox/__init__.py +++ b/faststream_outbox/__init__.py @@ -7,7 +7,7 @@ RetryStrategyProto, ) from faststream_outbox.router import OutboxRouter -from faststream_outbox.schema import OutboxState, make_outbox_table +from faststream_outbox.schema import make_outbox_table from faststream_outbox.testing import TestOutboxBroker @@ -18,7 +18,6 @@ "NoRetry", "OutboxBroker", "OutboxRouter", - "OutboxState", "RetryStrategyProto", "TestOutboxBroker", "make_outbox_table", diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index b3f5d85..b6b1f31 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -2,13 +2,17 @@ Postgres outbox client. All read/write paths against the outbox table live here. The fetch query is the -load-bearing piece: a single CTE that selects due rows ``FOR UPDATE SKIP LOCKED`` -and immediately ``UPDATE``s them to ``processing`` with a fresh lease token, +load-bearing piece: a single CTE that selects available rows ``FOR UPDATE SKIP LOCKED`` +and immediately ``UPDATE``s them with a fresh lease (``acquired_token`` + ``acquired_at``), ``RETURNING`` the row in one round-trip. +A row is "available" iff its lease is unset *or* its lease has expired +(``acquired_at < now() - lease_ttl_seconds``). This collapses what used to be a +state column plus a separate ``release_stuck`` reaper into a single predicate. + Every terminal write (``delete_with_lease``, ``mark_pending_with_lease``) filters -on ``acquired_token`` so a slow handler whose lease was reclaimed by -``release_stuck`` can no longer mutate that row. +on ``acquired_token`` so a slow handler whose lease was reclaimed by a newer fetch +can no longer mutate that row. """ import datetime as _dt @@ -18,7 +22,6 @@ from sqlalchemy import ( Float, - and_, bindparam, delete, func, @@ -30,7 +33,6 @@ ) from faststream_outbox.message import OutboxInnerMessage -from faststream_outbox.schema import OutboxState if TYPE_CHECKING: @@ -51,34 +53,40 @@ class OutboxClient: def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None: self._engine = engine self._table = outbox_table - # Stable advisory-lock key derived from the table name; ``hashtext`` returns int4. - # Parameterized to keep the table name out of SQL string interpolation. - self._advisory_lock_sql = text("SELECT pg_try_advisory_xact_lock(hashtext(:lock_key))").bindparams( - lock_key=f"faststream_outbox:{outbox_table.name}" - ) @property def table(self) -> "Table": return self._table - async def fetch(self, queues: "Sequence[str]", *, limit: int) -> list[OutboxInnerMessage]: + async def fetch( + self, + queues: "Sequence[str]", + *, + limit: int, + lease_ttl_seconds: float, + ) -> list[OutboxInnerMessage]: """ - Atomically claim up to *limit* due rows for the given queue names. + Atomically claim up to *limit* available rows for the given queue names. - Returns the freshly-leased rows. Each row carries ``acquired_token`` which the - worker loop must echo back on the terminal ``DELETE``/``UPDATE``. + A row is available iff its lease is unset (``acquired_token IS NULL``) or its + lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each + carries ``acquired_token`` which the worker loop must echo back on the terminal + ``DELETE``/``UPDATE``. """ if not queues: return [] token = uuid.uuid4() t = self._table + # ``make_interval(secs => :lease_ttl)`` keeps the cutoff computation server-side + # so lease expiry is immune to clock skew between worker and DB hosts. + lease_cutoff = func.now() - func.make_interval(0, 0, 0, 0, 0, 0, bindparam("lease_ttl", type_=Float)) ready = ( select(t.c.id) .where( - t.c.state == OutboxState.PENDING.value, t.c.next_attempt_at <= func.now(), or_(*(t.c.queue == q for q in queues)), + or_(t.c.acquired_token.is_(None), t.c.acquired_at < lease_cutoff), ) .order_by(t.c.next_attempt_at) .limit(limit) @@ -89,7 +97,6 @@ async def fetch(self, queues: "Sequence[str]", *, limit: int) -> list[OutboxInne update(t) .where(t.c.id.in_(select(ready.c.id))) .values( - state=OutboxState.PROCESSING.value, acquired_at=func.now(), acquired_token=token, deliveries_count=t.c.deliveries_count + 1, @@ -97,7 +104,7 @@ async def fetch(self, queues: "Sequence[str]", *, limit: int) -> list[OutboxInne .returning(*t.c) ) async with self._engine.begin() as conn: - result = await conn.execute(stmt) + result = await conn.execute(stmt, {"lease_ttl": max(0.0, lease_ttl_seconds)}) rows = result.mappings().all() return [_row_to_message(dict(row)) for row in rows] @@ -120,7 +127,7 @@ async def mark_pending_with_lease( # noqa: PLR0913 last_attempt_at: _dt.datetime, ) -> bool: """ - Move *message_id* back to ``pending`` for retry, iff it still holds the lease. + Release the lease on *message_id* and reschedule it for retry, iff it still holds the lease. ``next_attempt_at`` is computed server-side as ``now() + delay_seconds`` so retry timing uses the DB clock, not the worker's. Returns True if the row was updated. @@ -131,7 +138,6 @@ async def mark_pending_with_lease( # noqa: PLR0913 update(t) .where(t.c.id == message_id, t.c.acquired_token == acquired_token) .values( - state=OutboxState.PENDING.value, next_attempt_at=next_attempt_at_expr, attempts_count=attempts_count, first_attempt_at=first_attempt_at, @@ -144,40 +150,6 @@ async def mark_pending_with_lease( # noqa: PLR0913 result = await conn.execute(stmt, {"delay": max(0.0, delay_seconds)}) return (result.rowcount or 0) > 0 - async def release_stuck(self, *, timeout_seconds: float) -> int: - """ - Flip ``processing`` rows back to ``pending`` once their lease is older than *timeout_seconds*. - - Wrapped in ``pg_try_advisory_xact_lock`` so multiple processes don't fight over - the same rows. Returns the number of rows released (``0`` if the lock was not - acquired — another process is doing the work). - """ - t = self._table - # ``make_interval(secs => :timeout)`` keeps the cutoff computation server-side so - # release_stuck windows are immune to clock skew between worker and DB hosts. - stale_cutoff = func.now() - func.make_interval(0, 0, 0, 0, 0, 0, bindparam("timeout", type_=Float)) - stmt = ( - update(t) - .where( - and_( - t.c.state == OutboxState.PROCESSING.value, - t.c.acquired_at.isnot(None), - t.c.acquired_at < stale_cutoff, - ) - ) - .values( - state=OutboxState.PENDING.value, - acquired_at=None, - acquired_token=None, - ) - ) - async with self._engine.begin() as conn: - lock_result = await conn.execute(self._advisory_lock_sql) - if not lock_result.scalar(): - return 0 - result = await conn.execute(stmt, {"timeout": timeout_seconds}) - return result.rowcount or 0 - async def validate_schema(self) -> None: """ Validate that the database table matches the package's expected columns. @@ -207,7 +179,6 @@ def _row_to_message(row: dict) -> OutboxInnerMessage: queue=row["queue"], payload=row["payload"], headers=row["headers"], - state=OutboxState(row["state"]), attempts_count=row["attempts_count"], deliveries_count=row["deliveries_count"], created_at=row["created_at"], diff --git a/faststream_outbox/message.py b/faststream_outbox/message.py index 1548cf7..f1dd7f3 100644 --- a/faststream_outbox/message.py +++ b/faststream_outbox/message.py @@ -2,7 +2,7 @@ Outbox message representations. ``OutboxInnerMessage`` is the in-memory mirror of a row claimed by the fetch loop. -Its ``ack``/``nack``/``reject`` methods only mutate in-memory state — the actual +Its ``ack``/``nack``/``reject`` methods only mutate in-memory intent — the actual ``DELETE`` or ``UPDATE`` is issued by the worker loop, scoped by ``acquired_token`` so a re-claimed row's lease holder is the only writer. @@ -18,8 +18,6 @@ from faststream.message.message import StreamMessage -from faststream_outbox.schema import OutboxState - if TYPE_CHECKING: from faststream._internal.basic_types import LoggerProto @@ -34,17 +32,17 @@ def _utcnow() -> _dt.datetime: @dataclass(kw_only=True) class OutboxInnerMessage: """ - In-memory copy of a claimed outbox row, plus state-machine helpers. + In-memory copy of a claimed outbox row, plus ack/nack/reject intent helpers. - The state machine here is purely *intent* — what should happen next when the - worker loop flushes results. The actual DB write is the worker loop's job. + The ack/nack/reject methods set in-memory intent flags (``to_delete``, + ``pending_delay_seconds``). The worker loop reads those flags and issues the + actual DB write, scoped by ``acquired_token``. """ id: int queue: str payload: bytes headers: dict[str, str] | None - state: OutboxState attempts_count: int deliveries_count: int created_at: _dt.datetime diff --git a/faststream_outbox/registrator.py b/faststream_outbox/registrator.py index 7b779cf..7606d00 100644 --- a/faststream_outbox/registrator.py +++ b/faststream_outbox/registrator.py @@ -27,8 +27,7 @@ def subscriber( # ty: ignore[invalid-method-override] fetch_batch_size: int = 10, min_fetch_interval: float = 1.0, max_fetch_interval: float = 10.0, - release_stuck_timeout: float = 300.0, - release_stuck_interval: float | None = None, + lease_ttl_seconds: float = 60.0, max_deliveries: int | None = None, dependencies: Iterable["Dependant"] = (), parser: CustomCallable | None = None, @@ -49,10 +48,7 @@ def subscriber( # ty: ignore[invalid-method-override] fetch_batch_size=fetch_batch_size, min_fetch_interval=min_fetch_interval, max_fetch_interval=max_fetch_interval, - release_stuck_timeout=release_stuck_timeout, - release_stuck_interval=release_stuck_interval - if release_stuck_interval is not None - else release_stuck_timeout / 2, + lease_ttl_seconds=lease_ttl_seconds, max_deliveries=max_deliveries, config=self.config, # ty: ignore[invalid-argument-type] title_=title_, diff --git a/faststream_outbox/router.py b/faststream_outbox/router.py index 55ef3c5..31737c4 100644 --- a/faststream_outbox/router.py +++ b/faststream_outbox/router.py @@ -29,8 +29,7 @@ def __init__( # noqa: PLR0913 fetch_batch_size: int = 10, min_fetch_interval: float = 1.0, max_fetch_interval: float = 10.0, - release_stuck_timeout: float = 300.0, - release_stuck_interval: float | None = None, + lease_ttl_seconds: float = 60.0, max_deliveries: int | None = None, dependencies: Iterable["Dependant"] = (), parser: CustomCallable | None = None, @@ -48,8 +47,7 @@ def __init__( # noqa: PLR0913 fetch_batch_size=fetch_batch_size, min_fetch_interval=min_fetch_interval, max_fetch_interval=max_fetch_interval, - release_stuck_timeout=release_stuck_timeout, - release_stuck_interval=release_stuck_interval, + lease_ttl_seconds=lease_ttl_seconds, max_deliveries=max_deliveries, dependencies=dependencies, parser=parser, diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py index 100d2fd..7134a17 100644 --- a/faststream_outbox/schema.py +++ b/faststream_outbox/schema.py @@ -6,15 +6,17 @@ index (create it in your migration alongside the table):: CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at) - WHERE state = 'pending'; + WHERE acquired_token IS NULL; + +A row is "available" iff its lease is unset (``acquired_token IS NULL``) or its lease +is expired (``acquired_at < now() - lease_ttl_seconds``). The fetch query reclaims +both cases inline; there is no separate state column or background reaper. """ -import enum from typing import TYPE_CHECKING from sqlalchemy import ( BigInteger, - CheckConstraint, Column, DateTime, Index, @@ -31,25 +33,6 @@ from sqlalchemy import MetaData -class OutboxState(enum.StrEnum): - """ - Outbox row lifecycle. - - User-inserted rows start as ``PENDING``. The fetch loop atomically claims them - via ``FOR UPDATE SKIP LOCKED``, flipping to ``PROCESSING``. After the handler - returns, the row is either ``DELETE``d (ack/reject/terminal nack) or returned - to ``PENDING`` with an updated ``next_attempt_at`` (retryable nack). - """ - - PENDING = "pending" - PROCESSING = "processing" - - -# Allowed values for the state column. Stored as VARCHAR(16) with a CHECK constraint -# rather than a native PG enum so adding states later is a non-breaking migration. -_STATE_VALUES = tuple(s.value for s in OutboxState) - - def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table: """ Build the outbox ``Table`` and attach it to *metadata*. @@ -60,7 +43,6 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table The recommended composite partial index for fetch performance is documented in the module docstring above; create it explicitly in your migration. """ - state_check = "state IN (" + ", ".join(f"'{v}'" for v in _STATE_VALUES) + ")" return Table( table_name, metadata, @@ -68,13 +50,6 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table Column("queue", String(255), nullable=False, index=True), Column("payload", LargeBinary, nullable=False), Column("headers", JSONB, nullable=True), - Column( - "state", - String(16), - CheckConstraint(state_check, name=f"{table_name}_state_check"), - nullable=False, - server_default=OutboxState.PENDING.value, - ), Column("attempts_count", BigInteger, nullable=False, server_default="0"), Column("deliveries_count", BigInteger, nullable=False, server_default="0"), Column("created_at", DateTime(timezone=True), nullable=False, server_default=func.now()), diff --git a/faststream_outbox/subscriber/config.py b/faststream_outbox/subscriber/config.py index c8564a7..6e100b0 100644 --- a/faststream_outbox/subscriber/config.py +++ b/faststream_outbox/subscriber/config.py @@ -20,8 +20,7 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig): fetch_batch_size: int min_fetch_interval: float max_fetch_interval: float - release_stuck_timeout: float - release_stuck_interval: float + lease_ttl_seconds: float max_deliveries: int | None @property diff --git a/faststream_outbox/subscriber/factory.py b/faststream_outbox/subscriber/factory.py index cc532ec..7d891dd 100644 --- a/faststream_outbox/subscriber/factory.py +++ b/faststream_outbox/subscriber/factory.py @@ -19,8 +19,7 @@ def create_subscriber( # noqa: PLR0913 fetch_batch_size: int, min_fetch_interval: float, max_fetch_interval: float, - release_stuck_timeout: float, - release_stuck_interval: float, + lease_ttl_seconds: float, max_deliveries: int | None, config: "OutboxBrokerConfig", title_: str | None = None, @@ -35,8 +34,7 @@ def create_subscriber( # noqa: PLR0913 fetch_batch_size=fetch_batch_size, min_fetch_interval=min_fetch_interval, max_fetch_interval=max_fetch_interval, - release_stuck_timeout=release_stuck_timeout, - release_stuck_interval=release_stuck_interval, + lease_ttl_seconds=lease_ttl_seconds, max_deliveries=max_deliveries, ) specification_config = OutboxSubscriberSpecificationConfig( diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 18fe6d2..6fe8833 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -1,16 +1,16 @@ """ Outbox subscriber — the consume loop that backs ``@broker.subscriber("queue")``. -Three async tasks run per subscriber: +Two async tasks run per subscriber: -* ``_fetch_loop`` claims due rows from Postgres and pushes them onto an - in-process queue. Adaptive idle backoff with jitter when the queue is empty. +* ``_fetch_loop`` claims available rows from Postgres and pushes them onto an + in-process queue. A row is available iff its lease is unset *or* expired + (``acquired_at < now() - lease_ttl_seconds``); the fetch CTE reclaims both cases + in one round-trip. Adaptive idle backoff with jitter when the queue is empty. * ``_worker_loop`` (one per ``max_workers``) pulls rows, dispatches via ``consume()``, and writes the terminal state back through the client. Every terminal write is filtered by ``acquired_token`` so a row whose lease was - released doesn't get clobbered. -* ``_release_stuck_loop`` periodically flips ``processing`` rows older than - ``release_stuck_timeout`` back to ``pending`` (advisory-locked, idempotent). + reclaimed by a newer fetch doesn't get clobbered by the stale handler. """ import asyncio @@ -104,7 +104,6 @@ async def start(self) -> None: for _ in range(self._config.max_workers): self.add_task(self._worker_loop) self.add_task(self._fetch_loop) - self.add_task(self._release_stuck_loop) @typing.override async def stop(self) -> None: @@ -123,7 +122,11 @@ async def _fetch_loop(self) -> None: await anyio.sleep(base) continue try: - rows = await self._client.fetch(self._queues, limit=min(free, self._config.fetch_batch_size)) + rows = await self._client.fetch( + self._queues, + limit=min(free, self._config.fetch_batch_size), + lease_ttl_seconds=self._config.lease_ttl_seconds, + ) except Exception as e: # noqa: BLE001 self._log(log_level=logging.ERROR, message=f"Outbox fetch error: {e!r}", exc_info=e) error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) @@ -196,22 +199,6 @@ async def _flush_retry(self, row: OutboxInnerMessage) -> None: message=f"Outbox row {row} lease expired before retry update; skipping", ) - async def _release_stuck_loop(self) -> None: - interval = self._config.release_stuck_interval - timeout = self._config.release_stuck_timeout - while self.running: - try: - released = await self._client.release_stuck(timeout_seconds=timeout) - except Exception as e: # noqa: BLE001 - self._log(log_level=logging.ERROR, message=f"release_stuck error: {e!r}", exc_info=e) - else: - if released: - self._log( - log_level=logging.INFO, - message=f"release_stuck reset {released} stale rows back to pending", - ) - await anyio.sleep(interval) - @typing.override async def get_one(self, *, timeout: float = 5.0) -> typing.NoReturn: msg = "OutboxBroker does not support get_one()" diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index e91d8e4..9ec399f 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -3,8 +3,8 @@ ``TestOutboxBroker`` wraps an ``OutboxBroker`` and swaps in a ``FakeOutboxClient`` backed by a list of dicts. The real ``OutboxSubscriber`` runs unmodified — same -fetch / worker / release-stuck loops — so tests exercise the actual delivery -path, not a shortcut. ``feed()`` simulates a row insert. +fetch / worker loops — so tests exercise the actual delivery path, not a shortcut. +``feed()`` simulates a row insert. """ import datetime as _dt @@ -17,7 +17,6 @@ from faststream_outbox.broker import OutboxBroker from faststream_outbox.message import OutboxInnerMessage -from faststream_outbox.schema import OutboxState if typing.TYPE_CHECKING: @@ -36,7 +35,6 @@ class _FakeRow: queue: str payload: bytes headers: dict[str, str] | None - state: str = OutboxState.PENDING.value attempts_count: int = 0 deliveries_count: int = 0 created_at: _dt.datetime = field(default_factory=_utcnow) @@ -81,22 +79,30 @@ def rows(self) -> list[_FakeRow]: def table(self) -> typing.Any: return None - async def fetch(self, queues: "Sequence[str]", *, limit: int) -> list[OutboxInnerMessage]: + async def fetch( + self, + queues: "Sequence[str]", + *, + limit: int, + lease_ttl_seconds: float, + ) -> list[OutboxInnerMessage]: if not queues: return [] now = _utcnow() + lease_cutoff = now - _dt.timedelta(seconds=max(0.0, lease_ttl_seconds)) token = uuid.uuid4() out: list[OutboxInnerMessage] = [] eligible = sorted( ( r for r in self._rows - if r.state == OutboxState.PENDING.value and r.queue in queues and r.next_attempt_at <= now + if r.queue in queues + and r.next_attempt_at <= now + and (r.acquired_token is None or (r.acquired_at is not None and r.acquired_at < lease_cutoff)) ), key=lambda r: r.next_attempt_at, ) for row in eligible[:limit]: - row.state = OutboxState.PROCESSING.value row.acquired_at = now row.acquired_token = token row.deliveries_count += 1 @@ -122,7 +128,6 @@ async def mark_pending_with_lease( # noqa: PLR0913 ) -> bool: for row in self._rows: if row.id == message_id and row.acquired_token == acquired_token: - row.state = OutboxState.PENDING.value row.next_attempt_at = _utcnow() + _dt.timedelta(seconds=max(0.0, delay_seconds)) row.attempts_count = attempts_count row.first_attempt_at = first_attempt_at @@ -132,17 +137,6 @@ async def mark_pending_with_lease( # noqa: PLR0913 return True return False - async def release_stuck(self, *, timeout_seconds: float) -> int: - cutoff = _utcnow() - _dt.timedelta(seconds=timeout_seconds) - released = 0 - for row in self._rows: - if row.state == OutboxState.PROCESSING.value and row.acquired_at is not None and row.acquired_at < cutoff: - row.state = OutboxState.PENDING.value - row.acquired_at = None - row.acquired_token = None - released += 1 - return released - async def validate_schema(self) -> None: return @@ -156,7 +150,6 @@ def _to_inner(row: _FakeRow) -> OutboxInnerMessage: queue=row.queue, payload=row.payload, headers=row.headers, - state=OutboxState(row.state), attempts_count=row.attempts_count, deliveries_count=row.deliveries_count, created_at=row.created_at, @@ -225,7 +218,6 @@ def _fake_start(self, broker: OutboxBroker, *args: typing.Any, **kwargs: typing. for _ in range(sub._config.max_workers): # noqa: SLF001 sub.add_task(sub._worker_loop) # noqa: SLF001 sub.add_task(sub._fetch_loop) # noqa: SLF001 - sub.add_task(sub._release_stuck_loop) # noqa: SLF001 async def _fake_connect(self, broker: OutboxBroker, *args: typing.Any, **kwargs: typing.Any) -> None: # noqa: ARG002 return diff --git a/tests/conftest.py b/tests/conftest.py index 5f00f5b..84d5c8f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,7 +35,7 @@ async def outbox_table(pg_engine: AsyncEngine) -> AsyncIterator[Table]: await conn.run_sync(metadata.create_all) await conn.exec_driver_sql( f'CREATE INDEX "{table_name}_pending_idx" ON "{table_name}" ' - f"(queue, next_attempt_at) WHERE state = 'pending'" + f"(queue, next_attempt_at) WHERE acquired_token IS NULL" ) yield table async with pg_engine.begin() as conn: diff --git a/tests/test_fake.py b/tests/test_fake.py index fbbad0c..fe4db9a 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -1,5 +1,6 @@ import asyncio import datetime as _dt +import uuid from collections.abc import Callable import pytest @@ -14,6 +15,7 @@ make_outbox_table, ) from faststream_outbox.envelope import _encode_payload as encode_payload +from faststream_outbox.testing import _FakeRow def _make_broker() -> OutboxBroker: @@ -167,7 +169,7 @@ async def handle(body: dict, correlation_id: str = Context("message.correlation_ assert seen == ["trace-xyz"] -async def test_fake_broker_release_stuck_recovers_processing_row() -> None: +async def test_fake_broker_expired_lease_is_reclaimed() -> None: broker = _make_broker() received: list[str] = [] @@ -175,20 +177,14 @@ async def test_fake_broker_release_stuck_recovers_processing_row() -> None: "orders", min_fetch_interval=0.01, max_fetch_interval=0.05, - release_stuck_timeout=0.1, - release_stuck_interval=0.05, + lease_ttl_seconds=0.1, ) async def handle(body: str) -> None: received.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: - # Manually create a "stuck" processing row with old acquired_at - import uuid as _uuid # noqa: PLC0415 - - from faststream_outbox.schema import OutboxState # noqa: PLC0415 - from faststream_outbox.testing import _FakeRow # noqa: PLC0415 - + # Manually create a row with an expired lease — fetch must reclaim it. old = _dt.datetime.now(tz=_dt.UTC) - _dt.timedelta(seconds=10) test_broker.fake_client._rows.append( # noqa: SLF001 _FakeRow( @@ -196,9 +192,8 @@ async def handle(body: str) -> None: queue="orders", payload=encode_payload("stuck-payload")[0], headers=encode_payload("stuck-payload")[1], - state=OutboxState.PROCESSING.value, acquired_at=old, - acquired_token=_uuid.uuid4(), + acquired_token=uuid.uuid4(), ) ) test_broker.fake_client._next_id = 100 # noqa: SLF001 @@ -243,12 +238,12 @@ def __init__(self) -> None: super().__init__() self._raised = False - async def fetch(self, queues, *, limit): + async def fetch(self, queues, *, limit, lease_ttl_seconds): if not self._raised: self._raised = True msg = "transient db error" raise RuntimeError(msg) - return await super().fetch(queues, limit=limit) + return await super().fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) broker = _make_broker() received: list[str] = [] @@ -297,77 +292,6 @@ async def handle(body: str) -> None: await _wait_until(lambda: len(received) == 2, timeout=5.0) -async def test_release_stuck_loop_recovers_from_client_error() -> None: - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 - - class FlakyReleaseStuckClient(FakeOutboxClient): - def __init__(self) -> None: - super().__init__() - self.release_calls = 0 - - async def release_stuck(self, *, timeout_seconds): - self.release_calls += 1 - if self.release_calls == 1: - msg = "transient" - raise RuntimeError(msg) - return await super().release_stuck(timeout_seconds=timeout_seconds) - - broker = _make_broker() - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - release_stuck_interval=0.05, - release_stuck_timeout=0.1, - ) - async def handle(body: str) -> None: ... - - flaky = FlakyReleaseStuckClient() - test_broker = TestOutboxBroker(broker) - test_broker.fake_client = flaky - async with test_broker: - await _wait_until(lambda: flaky.release_calls >= 2, timeout=5.0) - - -async def test_release_stuck_loop_logs_when_rows_released() -> None: - import uuid as _uuid # noqa: PLC0415 - - from faststream_outbox.schema import OutboxState # noqa: PLC0415 - from faststream_outbox.testing import _FakeRow # noqa: PLC0415 - - broker = _make_broker() - received: list[str] = [] - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - release_stuck_interval=0.05, - release_stuck_timeout=0.1, - ) - async def handle(body: str) -> None: - received.append(body) - - test_broker = TestOutboxBroker(broker) - async with test_broker: - old = _dt.datetime.now(tz=_dt.UTC) - _dt.timedelta(seconds=10) - p, h = encode_payload("stuck") - test_broker.fake_client._rows.append( # noqa: SLF001 - _FakeRow( - id=99, - queue="orders", - payload=p, - headers=h, - state=OutboxState.PROCESSING.value, - acquired_at=old, - acquired_token=_uuid.uuid4(), - ) - ) - test_broker.fake_client._next_id = 100 # noqa: SLF001 - await _wait_until(lambda: received, timeout=5.0) - - async def test_subscriber_with_no_handler_skips_loop_setup() -> None: """Calling subscriber.start() with no handler attached early-returns; no loops spawn.""" from faststream_outbox.subscriber.factory import create_subscriber # noqa: PLC0415 @@ -382,8 +306,7 @@ async def test_subscriber_with_no_handler_skips_loop_setup() -> None: fetch_batch_size=1, min_fetch_interval=1.0, max_fetch_interval=10.0, - release_stuck_timeout=300.0, - release_stuck_interval=150.0, + lease_ttl_seconds=60.0, max_deliveries=None, config=broker.config.broker_config, # type: ignore[arg-type] ) @@ -491,8 +414,8 @@ async def test_flush_with_no_lease_token_is_noop() -> None: from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 class TokenStrippingClient(FakeOutboxClient): - async def fetch(self, queues, *, limit): - rows = await super().fetch(queues, limit=limit) + async def fetch(self, queues, *, limit, lease_ttl_seconds): + rows = await super().fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) for row in rows: row.acquired_token = None # strip the lease return rows @@ -517,8 +440,8 @@ async def test_flush_retry_with_no_lease_token_is_noop() -> None: from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 class TokenStrippingClient(FakeOutboxClient): - async def fetch(self, queues, *, limit): - rows = await super().fetch(queues, limit=limit) + async def fetch(self, queues, *, limit, lease_ttl_seconds): + rows = await super().fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) for row in rows: row.acquired_token = None return rows diff --git a/tests/test_integration.py b/tests/test_integration.py index 2bdd1f6..71a2a8d 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -11,7 +11,6 @@ from faststream_outbox import ( ConstantRetry, OutboxBroker, - OutboxState, make_outbox_table, ) from faststream_outbox.client import OutboxClient @@ -62,7 +61,7 @@ async def test_fetch_returns_pending_rows_only(pg_engine, outbox_table) -> None: for i in range(3): await conn.execute(insert(outbox_table).values(queue="orders", payload=f"p-{i}".encode())) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=10) + rows = await client.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) assert len(rows) == 3 assert {r.queue for r in rows} == {"orders"} assert all(r.acquired_token is not None for r in rows) @@ -73,7 +72,7 @@ async def test_fetch_skips_other_queues(pg_engine, outbox_table) -> None: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) await conn.execute(insert(outbox_table).values(queue="other", payload=b"y")) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=10) + rows = await client.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) assert len(rows) == 1 assert rows[0].queue == "orders" @@ -85,7 +84,7 @@ async def test_two_concurrent_fetches_dont_double_claim(pg_engine, outbox_table) client = OutboxClient(pg_engine, outbox_table) async def fetch_n(n: int) -> list[int]: - rows = await client.fetch(["orders"], limit=n) + rows = await client.fetch(["orders"], limit=n, lease_ttl_seconds=60.0) return [r.id for r in rows] results = await asyncio.gather(fetch_n(10), fetch_n(10)) @@ -98,7 +97,7 @@ async def test_delete_with_lease_succeeds_with_correct_token(pg_engine, outbox_t async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=1) + rows = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) assert len(rows) == 1 deleted = await client.delete_with_lease(rows[0].id, rows[0].acquired_token) # ty: ignore[invalid-argument-type] assert deleted is True @@ -109,7 +108,7 @@ async def test_delete_with_wrong_token_is_noop(pg_engine, outbox_table) -> None: async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=1) + rows = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) deleted = await client.delete_with_lease(rows[0].id, uuid.uuid4()) # wrong token assert deleted is False assert await _row_count(pg_engine, outbox_table) == 1 # row still there @@ -119,7 +118,7 @@ async def test_mark_pending_with_lease(pg_engine, outbox_table) -> None: async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=1) + rows = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) msg = rows[0] updated = await client.mark_pending_with_lease( msg.id, @@ -131,7 +130,7 @@ async def test_mark_pending_with_lease(pg_engine, outbox_table) -> None: ) assert updated is True # Refetch — should be empty because next_attempt_at is in the future - rows2 = await client.fetch(["orders"], limit=10) + rows2 = await client.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) assert rows2 == [] @@ -140,7 +139,7 @@ async def test_mark_pending_with_lease_uses_db_clock(pg_engine, outbox_table) -> async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=1) + rows = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) msg = rows[0] delay = 10.0 # Use clock_timestamp(), not now(): now() returns transaction start time and @@ -163,21 +162,34 @@ async def test_mark_pending_with_lease_uses_db_clock(pg_engine, outbox_table) -> assert db_before + _dt.timedelta(seconds=delay) <= next_at <= db_after + _dt.timedelta(seconds=delay) -async def test_release_stuck_recovers_old_processing_rows(pg_engine, outbox_table) -> None: +async def test_expired_lease_is_reclaimed_by_fetch(pg_engine, outbox_table) -> None: + """A row whose lease has expired must be re-claimed by the next fetch with a fresh token.""" async with pg_engine.begin() as conn: await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) client = OutboxClient(pg_engine, outbox_table) - rows = await client.fetch(["orders"], limit=1) - assert rows - # Backdate acquired_at so release_stuck picks it up + first = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) + assert first + original_token = first[0].acquired_token + # Backdate acquired_at so the lease is now considered expired by a 60s TTL. backdate_sql = f"UPDATE \"{outbox_table.name}\" SET acquired_at = NOW() - INTERVAL '1 hour'" # noqa: S608 async with pg_engine.begin() as conn: await conn.exec_driver_sql(backdate_sql) - released = await client.release_stuck(timeout_seconds=60) - assert released == 1 - # Row should now be claimable again - rows2 = await client.fetch(["orders"], limit=10) - assert len(rows2) == 1 + second = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) + assert len(second) == 1 + assert second[0].id == first[0].id + assert second[0].acquired_token != original_token # fresh lease holder + + +async def test_unexpired_lease_is_not_reclaimed_by_fetch(pg_engine, outbox_table) -> None: + """A still-valid lease must NOT be reclaimed by another fetch.""" + async with pg_engine.begin() as conn: + await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) + client = OutboxClient(pg_engine, outbox_table) + first = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) + assert first + # Lease was just set; a fresh fetch with a 60s TTL must find nothing. + second = await client.fetch(["orders"], limit=1, lease_ttl_seconds=60.0) + assert second == [] async def test_end_to_end_subscriber_delivers_inserted_row(pg_engine, outbox_table) -> None: @@ -238,29 +250,6 @@ async def _check_deleted() -> bool: raise AssertionError(msg) # pragma: no cover -async def test_subscriber_state_machine_uses_pending_value(outbox_table) -> None: - """Sanity: the OutboxState constants match the column CHECK constraint.""" - states = list(outbox_table.c.state.constraints) - assert any("'pending'" in str(c.sqltext) for c in states if hasattr(c, "sqltext")) - assert OutboxState.PENDING.value == "pending" - assert OutboxState.PROCESSING.value == "processing" - - -async def test_release_stuck_returns_zero_when_lock_held(pg_engine, outbox_table) -> None: - """If another process holds the advisory lock, release_stuck no-ops and returns 0.""" - from sqlalchemy import text # noqa: PLC0415 - - client = OutboxClient(pg_engine, outbox_table) - lock_key = f"faststream_outbox:{outbox_table.name}" - - async with pg_engine.connect() as holder, holder.begin(): - # Acquire the same advisory lock the client uses (xact-scoped). - await holder.execute(text(f"SELECT pg_advisory_xact_lock(hashtext('{lock_key}'))")) - # While held, release_stuck should fail to acquire and return 0. - released = await client.release_stuck(timeout_seconds=60) - assert released == 0 - - async def test_validate_schema_fails_when_columns_missing(pg_engine, outbox_table) -> None: """Drop a column the package expects and verify validate_schema reports it.""" drop_sql = f'ALTER TABLE "{outbox_table.name}" DROP COLUMN headers' diff --git a/tests/test_unit.py b/tests/test_unit.py index 5428e98..843131b 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -12,7 +12,6 @@ NoRetry, OutboxBroker, OutboxRouter, - OutboxState, make_outbox_table, ) from faststream_outbox.client import OutboxClient @@ -40,7 +39,6 @@ def test_make_outbox_table_columns_present() -> None: "queue", "payload", "headers", - "state", "attempts_count", "deliveries_count", "created_at", @@ -168,7 +166,6 @@ def _make_msg(**overrides: object) -> OutboxInnerMessage: "queue": "q", "payload": b"p", "headers": None, - "state": OutboxState.PROCESSING, "attempts_count": 0, "deliveries_count": 1, "created_at": _dt.datetime.now(tz=_dt.UTC), @@ -508,7 +505,6 @@ def test_outbox_router_config_engine_state_raises() -> None: def test_client_table_property() -> None: - from faststream_outbox.client import OutboxClient # noqa: PLC0415 metadata = MetaData() t = make_outbox_table(metadata) @@ -517,26 +513,11 @@ def test_client_table_property() -> None: async def test_client_fetch_empty_queues_returns_empty() -> None: - from faststream_outbox.client import OutboxClient # noqa: PLC0415 metadata = MetaData() t = make_outbox_table(metadata) client = OutboxClient(AsyncMock(), t) - assert await client.fetch([], limit=10) == [] - - -def test_advisory_lock_sql_parameterizes_table_name() -> None: - """Table name must be a bound parameter, never interpolated as SQL literal.""" - metadata = MetaData() - hostile = "x'); DROP TABLE x; --" - t = make_outbox_table(metadata, table_name=hostile) - client = OutboxClient(AsyncMock(), t) - - compiled = client._advisory_lock_sql.compile() # noqa: SLF001 - # Table name must NOT appear in the SQL string itself... - assert hostile not in str(compiled) - # ...but must appear as a bound param value. - assert any(hostile in str(v) for v in compiled.params.values()) + assert await client.fetch([], limit=10, lease_ttl_seconds=60.0) == [] # --- OutboxMessage.reject + assert_state_set logger branch --- @@ -628,7 +609,7 @@ async def test_fake_client_fetch_empty_queues() -> None: from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 client = FakeOutboxClient() - assert await client.fetch([], limit=10) == [] + assert await client.fetch([], limit=10, lease_ttl_seconds=60.0) == [] async def test_fake_client_delete_miss() -> None: