Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,23 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr

### User-owned schema

`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. `OutboxState` is `PENDING` / `PROCESSING` only; terminal failures `DELETE` (no archive, no DLQ).
`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ).

### Three-loop subscriber (`subscriber/usecase.py`)
### Two-loop subscriber (`subscriber/usecase.py`)

Per subscriber:
1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE state='processing', acquired_token=:uuid RETURNING *`. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors.
1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors.
2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`.
3. **`_release_stuck_loop`** — flips `processing` rows older than `release_stuck_timeout` back to `pending`, wrapped in a `pg_try_advisory_xact_lock` keyed off the table name so multiple processes don't fight.

### Lease-token invariant — load-bearing

Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and `release_stuck` re-claimed the row, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this.
Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and a newer fetch reclaimed the row with a fresh token, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this.

`release_stuck` computes its cutoff server-side via `make_interval(secs => :timeout)` to be immune to worker/DB clock skew.
`lease_ttl_seconds` (default `60.0`, per-subscriber) **must exceed the P99 handler duration with margin**, otherwise healthy in-flight handlers race their own lease expiry and trigger duplicate deliveries. The lease cutoff is computed server-side via `make_interval(secs => :lease_ttl)` to be immune to worker/DB clock skew.

### Test broker

`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts) but runs the **real** `OutboxSubscriber` loops — fetch / worker / release-stuck — so tests exercise the actual delivery path. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`).
`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts) but runs the **real** `OutboxSubscriber` loops — fetch / worker — so tests exercise the actual delivery path. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`).

### Engine ownership

Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ async with session_factory() as session, session.begin():

`broker.publish_batch(*bodies, queue, session, headers=None)` inserts many rows in a single round-trip with the same transactional contract.

A subscriber owns three async loops:
A subscriber owns two async loops:

1. **fetch** — claims due rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE state='processing', acquired_token=:uuid RETURNING *` in a single CTE.
1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed.
2. **workers** (× `max_workers`) — dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`. On failure, the retry strategy decides: schedule another attempt, or terminal `DELETE`.
3. **release-stuck** — periodically flips `processing` rows back to `pending` if their lease is older than `release_stuck_timeout`. Wrapped in a Postgres advisory lock so multiple processes don't compete.

The `acquired_token` is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal `DELETE`/`UPDATE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row.

`lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery.

## Recommended index

Add this to your Alembic migration alongside the table:

```sql
CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at)
WHERE state = 'pending';
WHERE acquired_token IS NULL;
```

## Schema validation
Expand Down Expand Up @@ -111,8 +112,7 @@ Per-subscriber knobs (passed to `@broker.subscriber("…", …)`):
- `max_workers` (default `1`) — concurrent handlers per subscriber.
- `fetch_batch_size` (default `10`) — rows claimed per fetch cycle.
- `min_fetch_interval` / `max_fetch_interval` (default `1.0` / `10.0` s) — base + ceiling for the adaptive idle backoff with jitter.
- `release_stuck_timeout` (default `300.0` s) — how long a `processing` row may live before being released back to `pending`.
- `release_stuck_interval` (default `release_stuck_timeout / 2`).
- `max_deliveries` (default `None` — unbounded) — total claims (including stuck-recovery re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge.
- `lease_ttl_seconds` (default `60.0` s) — how long a claim is valid before another fetch may reclaim it. **Must exceed your handler's P99 duration with margin.**
- `max_deliveries` (default `None` — unbounded) — total claims (including lease-expiry re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge.

## 📝 [License](LICENSE)
3 changes: 1 addition & 2 deletions faststream_outbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
RetryStrategyProto,
)
from faststream_outbox.router import OutboxRouter
from faststream_outbox.schema import OutboxState, make_outbox_table
from faststream_outbox.schema import make_outbox_table
from faststream_outbox.testing import TestOutboxBroker


Expand All @@ -18,7 +18,6 @@
"NoRetry",
"OutboxBroker",
"OutboxRouter",
"OutboxState",
"RetryStrategyProto",
"TestOutboxBroker",
"make_outbox_table",
Expand Down
81 changes: 26 additions & 55 deletions faststream_outbox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
Postgres outbox client.

All read/write paths against the outbox table live here. The fetch query is the
load-bearing piece: a single CTE that selects due rows ``FOR UPDATE SKIP LOCKED``
and immediately ``UPDATE``s them to ``processing`` with a fresh lease token,
load-bearing piece: a single CTE that selects available rows ``FOR UPDATE SKIP LOCKED``
and immediately ``UPDATE``s them with a fresh lease (``acquired_token`` + ``acquired_at``),
``RETURNING`` the row in one round-trip.

A row is "available" iff its lease is unset *or* its lease has expired
(``acquired_at < now() - lease_ttl_seconds``). This collapses what used to be a
state column plus a separate ``release_stuck`` reaper into a single predicate.

Every terminal write (``delete_with_lease``, ``mark_pending_with_lease``) filters
on ``acquired_token`` so a slow handler whose lease was reclaimed by
``release_stuck`` can no longer mutate that row.
on ``acquired_token`` so a slow handler whose lease was reclaimed by a newer fetch
can no longer mutate that row.
"""

import datetime as _dt
Expand All @@ -18,7 +22,6 @@

from sqlalchemy import (
Float,
and_,
bindparam,
delete,
func,
Expand All @@ -30,7 +33,6 @@
)

from faststream_outbox.message import OutboxInnerMessage
from faststream_outbox.schema import OutboxState


if TYPE_CHECKING:
Expand All @@ -51,34 +53,40 @@ class OutboxClient:
def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None:
self._engine = engine
self._table = outbox_table
# Stable advisory-lock key derived from the table name; ``hashtext`` returns int4.
# Parameterized to keep the table name out of SQL string interpolation.
self._advisory_lock_sql = text("SELECT pg_try_advisory_xact_lock(hashtext(:lock_key))").bindparams(
lock_key=f"faststream_outbox:{outbox_table.name}"
)

@property
def table(self) -> "Table":
return self._table

async def fetch(self, queues: "Sequence[str]", *, limit: int) -> list[OutboxInnerMessage]:
async def fetch(
self,
queues: "Sequence[str]",
*,
limit: int,
lease_ttl_seconds: float,
) -> list[OutboxInnerMessage]:
"""
Atomically claim up to *limit* due rows for the given queue names.
Atomically claim up to *limit* available rows for the given queue names.

Returns the freshly-leased rows. Each row carries ``acquired_token`` which the
worker loop must echo back on the terminal ``DELETE``/``UPDATE``.
A row is available iff its lease is unset (``acquired_token IS NULL``) or its
lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each
carries ``acquired_token`` which the worker loop must echo back on the terminal
``DELETE``/``UPDATE``.
"""
if not queues:
return []
token = uuid.uuid4()
t = self._table

# ``make_interval(secs => :lease_ttl)`` keeps the cutoff computation server-side
# so lease expiry is immune to clock skew between worker and DB hosts.
lease_cutoff = func.now() - func.make_interval(0, 0, 0, 0, 0, 0, bindparam("lease_ttl", type_=Float))
ready = (
select(t.c.id)
.where(
t.c.state == OutboxState.PENDING.value,
t.c.next_attempt_at <= func.now(),
or_(*(t.c.queue == q for q in queues)),
or_(t.c.acquired_token.is_(None), t.c.acquired_at < lease_cutoff),
)
.order_by(t.c.next_attempt_at)
.limit(limit)
Expand All @@ -89,15 +97,14 @@ async def fetch(self, queues: "Sequence[str]", *, limit: int) -> list[OutboxInne
update(t)
.where(t.c.id.in_(select(ready.c.id)))
.values(
state=OutboxState.PROCESSING.value,
acquired_at=func.now(),
acquired_token=token,
deliveries_count=t.c.deliveries_count + 1,
)
.returning(*t.c)
)
async with self._engine.begin() as conn:
result = await conn.execute(stmt)
result = await conn.execute(stmt, {"lease_ttl": max(0.0, lease_ttl_seconds)})
rows = result.mappings().all()
return [_row_to_message(dict(row)) for row in rows]

Expand All @@ -120,7 +127,7 @@ async def mark_pending_with_lease( # noqa: PLR0913
last_attempt_at: _dt.datetime,
) -> bool:
"""
Move *message_id* back to ``pending`` for retry, iff it still holds the lease.
Release the lease on *message_id* and reschedule it for retry, iff it still holds the lease.

``next_attempt_at`` is computed server-side as ``now() + delay_seconds`` so
retry timing uses the DB clock, not the worker's. Returns True if the row was updated.
Expand All @@ -131,7 +138,6 @@ async def mark_pending_with_lease( # noqa: PLR0913
update(t)
.where(t.c.id == message_id, t.c.acquired_token == acquired_token)
.values(
state=OutboxState.PENDING.value,
next_attempt_at=next_attempt_at_expr,
attempts_count=attempts_count,
first_attempt_at=first_attempt_at,
Expand All @@ -144,40 +150,6 @@ async def mark_pending_with_lease( # noqa: PLR0913
result = await conn.execute(stmt, {"delay": max(0.0, delay_seconds)})
return (result.rowcount or 0) > 0

async def release_stuck(self, *, timeout_seconds: float) -> int:
"""
Flip ``processing`` rows back to ``pending`` once their lease is older than *timeout_seconds*.

Wrapped in ``pg_try_advisory_xact_lock`` so multiple processes don't fight over
the same rows. Returns the number of rows released (``0`` if the lock was not
acquired — another process is doing the work).
"""
t = self._table
# ``make_interval(secs => :timeout)`` keeps the cutoff computation server-side so
# release_stuck windows are immune to clock skew between worker and DB hosts.
stale_cutoff = func.now() - func.make_interval(0, 0, 0, 0, 0, 0, bindparam("timeout", type_=Float))
stmt = (
update(t)
.where(
and_(
t.c.state == OutboxState.PROCESSING.value,
t.c.acquired_at.isnot(None),
t.c.acquired_at < stale_cutoff,
)
)
.values(
state=OutboxState.PENDING.value,
acquired_at=None,
acquired_token=None,
)
)
async with self._engine.begin() as conn:
lock_result = await conn.execute(self._advisory_lock_sql)
if not lock_result.scalar():
return 0
result = await conn.execute(stmt, {"timeout": timeout_seconds})
return result.rowcount or 0

async def validate_schema(self) -> None:
"""
Validate that the database table matches the package's expected columns.
Expand Down Expand Up @@ -207,7 +179,6 @@ def _row_to_message(row: dict) -> OutboxInnerMessage:
queue=row["queue"],
payload=row["payload"],
headers=row["headers"],
state=OutboxState(row["state"]),
attempts_count=row["attempts_count"],
deliveries_count=row["deliveries_count"],
created_at=row["created_at"],
Expand Down
12 changes: 5 additions & 7 deletions faststream_outbox/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Outbox message representations.

``OutboxInnerMessage`` is the in-memory mirror of a row claimed by the fetch loop.
Its ``ack``/``nack``/``reject`` methods only mutate in-memory state — the actual
Its ``ack``/``nack``/``reject`` methods only mutate in-memory intent — the actual
``DELETE`` or ``UPDATE`` is issued by the worker loop, scoped by ``acquired_token``
so a re-claimed row's lease holder is the only writer.

Expand All @@ -18,8 +18,6 @@

from faststream.message.message import StreamMessage

from faststream_outbox.schema import OutboxState


if TYPE_CHECKING:
from faststream._internal.basic_types import LoggerProto
Expand All @@ -34,17 +32,17 @@ def _utcnow() -> _dt.datetime:
@dataclass(kw_only=True)
class OutboxInnerMessage:
"""
In-memory copy of a claimed outbox row, plus state-machine helpers.
In-memory copy of a claimed outbox row, plus ack/nack/reject intent helpers.

The state machine here is purely *intent* — what should happen next when the
worker loop flushes results. The actual DB write is the worker loop's job.
The ack/nack/reject methods set in-memory intent flags (``to_delete``,
``pending_delay_seconds``). The worker loop reads those flags and issues the
actual DB write, scoped by ``acquired_token``.
"""

id: int
queue: str
payload: bytes
headers: dict[str, str] | None
state: OutboxState
attempts_count: int
deliveries_count: int
created_at: _dt.datetime
Expand Down
8 changes: 2 additions & 6 deletions faststream_outbox/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def subscriber( # ty: ignore[invalid-method-override]
fetch_batch_size: int = 10,
min_fetch_interval: float = 1.0,
max_fetch_interval: float = 10.0,
release_stuck_timeout: float = 300.0,
release_stuck_interval: float | None = None,
lease_ttl_seconds: float = 60.0,
max_deliveries: int | None = None,
dependencies: Iterable["Dependant"] = (),
parser: CustomCallable | None = None,
Expand All @@ -49,10 +48,7 @@ def subscriber( # ty: ignore[invalid-method-override]
fetch_batch_size=fetch_batch_size,
min_fetch_interval=min_fetch_interval,
max_fetch_interval=max_fetch_interval,
release_stuck_timeout=release_stuck_timeout,
release_stuck_interval=release_stuck_interval
if release_stuck_interval is not None
else release_stuck_timeout / 2,
lease_ttl_seconds=lease_ttl_seconds,
max_deliveries=max_deliveries,
config=self.config, # ty: ignore[invalid-argument-type]
title_=title_,
Expand Down
6 changes: 2 additions & 4 deletions faststream_outbox/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def __init__( # noqa: PLR0913
fetch_batch_size: int = 10,
min_fetch_interval: float = 1.0,
max_fetch_interval: float = 10.0,
release_stuck_timeout: float = 300.0,
release_stuck_interval: float | None = None,
lease_ttl_seconds: float = 60.0,
max_deliveries: int | None = None,
dependencies: Iterable["Dependant"] = (),
parser: CustomCallable | None = None,
Expand All @@ -48,8 +47,7 @@ def __init__( # noqa: PLR0913
fetch_batch_size=fetch_batch_size,
min_fetch_interval=min_fetch_interval,
max_fetch_interval=max_fetch_interval,
release_stuck_timeout=release_stuck_timeout,
release_stuck_interval=release_stuck_interval,
lease_ttl_seconds=lease_ttl_seconds,
max_deliveries=max_deliveries,
dependencies=dependencies,
parser=parser,
Expand Down
Loading
Loading