Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

## Project

`faststream-outbox` is a FastStream broker integration that uses a Postgres table as the message queue (transactional outbox pattern). Postgres-only at v0; polling-only (no LISTEN/NOTIFY).
`faststream-outbox` is a FastStream broker integration that uses a Postgres table as the message queue (transactional outbox pattern). Postgres-only at v0. Subscribers poll the table and use LISTEN/NOTIFY to short-circuit idle waits.

## Commands

Expand All @@ -22,12 +22,22 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr

### Producer side

`broker.publish(body, *, queue, session, headers=None, correlation_id=None)` and `broker.publish_batch(*bodies, queue, session, ...)` insert outbox rows through the caller's `AsyncSession` (`session.execute(insert(table).values(...))`). They do **not** flush, commit, or open their own transaction — the row must commit with the caller's domain writes. Both reject anything that is not an `AsyncSession` with `TypeError`.
`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` and `broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` insert outbox rows through the caller's `AsyncSession`. They do **not** flush, commit, or open their own transaction — the row must commit with the caller's domain writes. Both reject anything that is not an `AsyncSession` with `TypeError`. `publish` returns the inserted row's id (or `None` on `timer_id` conflict); `publish_batch` returns nothing and does not accept `timer_id` (per-row dedup makes no sense in a batch).

`broker.request` raises `NotImplementedError` (outbox is fire-and-forget). `OutboxRegistrator.publisher` also raises. The `_NoProducer` stub exists only to satisfy FastStream's broker producer slot.

`_encode_payload` (in `envelope.py`) is the internal helper that turns `body` into `(payload_bytes, headers_dict)`. Not exported.

### Timers (delayed delivery)

`activate_in: timedelta` / `activate_at: datetime` (mutually exclusive) set `next_attempt_at` so the row is invisible to fetch until the gate opens — the `next_attempt_at <= now()` predicate in the fetch CTE is what gates eligibility, so no subscriber-side change is needed for scheduling. For `publish`, `next_attempt_at` is computed server-side via `now() + make_interval(secs => :s)` to stay clock-skew-safe; for `publish_batch` it's client-side (`datetime.now(UTC) + activate_in`) because executemany doesn't compose cleanly with column-level SQL expressions, and the few-ms drift is harmless for user-supplied scheduling.

`timer_id` (single `publish` only) flows into a `String(255)` column with a partial unique index on `(queue, timer_id) WHERE timer_id IS NOT NULL`. The producer switches to `pg_insert(...).on_conflict_do_nothing(index_elements=[queue, timer_id], index_where=timer_id IS NOT NULL)` so re-publishing the same id is a silent no-op (returns `None`). NOTIFY is skipped when `activate_in`/`activate_at` is set OR the conflict path returned no row — both cases would either wake listeners that find nothing, or wake them prematurely.

`broker.cancel_timer(*, queue, timer_id, session)` issues `DELETE WHERE queue=? AND timer_id=? AND acquired_token IS NULL` on the caller's session — the `acquired_token IS NULL` guard is load-bearing: it preserves the lease-token invariant by refusing to clobber a row whose handler is already in flight (returns `False` in that case; the delivery completes normally).

Latency floor: timer firing latency is bounded by `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. NOTIFY does not help here — listeners can't act on a future row. Sub-second precision is not a goal of this broker.

### User-owned schema

`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare the partial index `(queue, next_attempt_at) WHERE acquired_token IS NULL` on the table itself, so Alembic autogenerate brings it up. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ).
Expand All @@ -38,7 +48,7 @@ Per subscriber:
1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_<table>`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration.
2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`.

Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_<table>', queue)` on the caller's session right after the INSERT. NOTIFY is transactional listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY.
Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_<table>', queue)` on the caller's session right after the INSERT, **except** when the row is future-dated (`activate_in`/`activate_at` set) or a `timer_id` conflict made the insert a no-op — both cases skip NOTIFY since listeners can't act on the result. NOTIFY is transactional: listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY.

Channel naming convention: `outbox_<table_name>`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling.

Expand Down
38 changes: 36 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ async with session_factory() as session, session.begin():

`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` that you attach to your own `MetaData` and migrate via Alembic. The package does **not** own your schema; it only describes the columns it needs.

`broker.publish(body, *, queue, session, headers=None, correlation_id=None)` inserts one outbox row through the caller's `AsyncSession`. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an `async with session.begin():` block.
`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` inserts one outbox row through the caller's `AsyncSession`. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an `async with session.begin():` block. See [Timers](#timers-delayed-delivery) for `activate_in` / `activate_at` / `timer_id`.

`broker.publish_batch(*bodies, queue, session, headers=None)` inserts many rows in a single round-trip with the same transactional contract.
`broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` inserts many rows in a single round-trip with the same transactional contract.

A subscriber owns two async loops:

Expand All @@ -49,6 +49,40 @@ The `acquired_token` is critical: a slow handler whose lease expired and was re-

`lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery.

## Timers (delayed delivery)

Schedule a publish to fire later by passing `activate_in` (relative) or `activate_at` (absolute, tz-aware) — exactly one. Pass `timer_id` to deduplicate per `(queue, timer_id)`; cancel a not-yet-leased timer with `broker.cancel_timer(...)`.

```python
import datetime as dt

# Fire 30 seconds from now, deduplicated by timer_id:
await broker.publish(
{"order_id": 1},
queue="orders",
session=session,
activate_in=dt.timedelta(seconds=30),
timer_id=f"order-confirm-{order.id}",
)

# Fire at a specific UTC instant:
await broker.publish(
{"x": 1}, queue="orders", session=session,
activate_at=dt.datetime(2026, 6, 1, 9, tzinfo=dt.UTC),
)

# Cancel before it fires (no-op if the row is already in flight):
await broker.cancel_timer(queue="orders", timer_id="order-confirm-42", session=session)
```

`publish` returns the inserted row's `id`, or `None` if a row with the same `(queue, timer_id)` already exists. `cancel_timer` returns `True` if it deleted the row; `False` means either the timer didn't exist or was already leased to a worker (in which case delivery completes normally).

`publish_batch` accepts `activate_in` / `activate_at` to schedule every row in the batch identically — but no `timer_id` (per-row dedup makes no sense for a batch).

**Latency floor:** firing latency is bounded by the subscriber's `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. Lower it for sub-10s precision; sub-second precision is not a goal of this broker.

**Migration note:** existing deployments must regenerate Alembic migrations after upgrading — the new `timer_id` column and `<table>_timer_id_uq` partial unique index need to land in the database before publish-with-`timer_id` works.

## Schema validation

Schema validation is opt-in:
Expand Down
110 changes: 102 additions & 8 deletions faststream_outbox/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
owns subscribers on the consumer side.
"""

import datetime as _dt
import logging
import typing
from collections.abc import Iterable, Sequence
Expand All @@ -22,7 +23,8 @@
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
from sqlalchemy import insert, text
from sqlalchemy import Float, bindparam, delete, func, insert, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession

from faststream_outbox.client import OutboxClient
Expand Down Expand Up @@ -186,54 +188,146 @@ async def validate_schema(self) -> None:
"""Validate the user's table matches what the package expects. Opt-in."""
await self.client.validate_schema()

async def publish( # ty: ignore[invalid-method-override]
async def publish( # ty: ignore[invalid-method-override] # noqa: PLR0913
self,
body: typing.Any,
*,
queue: str,
session: AsyncSession,
headers: dict[str, str] | None = None,
correlation_id: str | None = None,
) -> None:
activate_in: _dt.timedelta | None = None,
activate_at: _dt.datetime | None = None,
timer_id: str | None = None,
) -> int | None:
"""
Insert one outbox row using *session*'s open transaction.

Must be called inside a transaction the caller owns (typically inside an
``async with session.begin():`` block). ``publish`` does not flush, commit,
or open its own transaction — that is the whole point of the transactional
outbox pattern: the row commits atomically with the caller's domain writes.

Schedule a delayed delivery by passing exactly one of *activate_in* (relative)
or *activate_at* (absolute, tz-aware). Pass *timer_id* to deduplicate per
``(queue, timer_id)`` — re-publishing with the same id is a no-op (returns
``None``). Cancel a not-yet-leased timer with :meth:`cancel_timer`.

Returns the inserted row's ``id`` (BigInt PK), or ``None`` if a timer with
the same ``(queue, timer_id)`` already exists.
"""
if not isinstance(session, AsyncSession):
msg = "broker.publish requires an sqlalchemy.ext.asyncio.AsyncSession"
raise TypeError(msg)
if activate_in is not None and activate_at is not None:
msg = "broker.publish accepts at most one of activate_in / activate_at"
raise ValueError(msg)
payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id)
await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs))
await self._notify(session, queue)
t = self._outbox_table
values: dict[str, typing.Any] = {"queue": queue, "payload": payload, "headers": hdrs}
# Server-side compute keeps timing immune to worker/DB clock skew (mirrors
# client.mark_pending_with_lease).
if activate_in is not None:
values["next_attempt_at"] = func.now() + func.make_interval(
0, 0, 0, 0, 0, 0, bindparam("activate_in_seconds", activate_in.total_seconds(), type_=Float)
)
elif activate_at is not None:
values["next_attempt_at"] = activate_at
if timer_id is not None:
values["timer_id"] = timer_id
is_future = activate_in is not None or activate_at is not None

if timer_id is not None:
stmt = (
pg_insert(t)
.values(**values)
.on_conflict_do_nothing(
index_elements=[t.c.queue, t.c.timer_id],
index_where=t.c.timer_id.is_not(None),
)
.returning(t.c.id)
)
else:
stmt = insert(t).values(**values).returning(t.c.id)

result = await session.execute(stmt)
row_id: int | None = result.scalar()
# Skip NOTIFY for future-dated rows (listeners can't act before the gate
# opens — polling fires them at next tick) and on conflict (no row landed).
if row_id is not None and not is_future:
await self._notify(session, queue)
return row_id

async def publish_batch( # ty: ignore[invalid-method-override]
self,
*bodies: typing.Any,
queue: str,
session: AsyncSession,
headers: dict[str, str] | None = None,
activate_in: _dt.timedelta | None = None,
activate_at: _dt.datetime | None = None,
) -> None:
"""
Insert multiple outbox rows via *session*. Same transactional contract as ``publish``.

Each row gets its own auto-generated ``correlation_id``; pass *headers* to
share static headers across all rows.
share static headers across all rows. *activate_in* / *activate_at* schedule
every row in the batch identically — per-row timer dedup is not supported,
use :meth:`publish` for that.
"""
if not isinstance(session, AsyncSession):
msg = "broker.publish_batch requires an sqlalchemy.ext.asyncio.AsyncSession"
raise TypeError(msg)
if activate_in is not None and activate_at is not None:
msg = "broker.publish_batch accepts at most one of activate_in / activate_at"
raise ValueError(msg)
if not bodies:
return
# Client-side time for batch: executemany doesn't compose with column-level
# SQL expressions easily, and a few-ms drift versus the DB is harmless for
# user-supplied scheduling. (Retries still use server time via mark_pending_with_lease.)
next_at: _dt.datetime | None = None
if activate_in is not None:
next_at = _dt.datetime.now(tz=_dt.UTC) + activate_in
elif activate_at is not None:
next_at = activate_at
rows = []
for body in bodies:
payload, hdrs = _encode_payload(body, headers=headers)
rows.append({"queue": queue, "payload": payload, "headers": hdrs})
row: dict[str, typing.Any] = {"queue": queue, "payload": payload, "headers": hdrs}
if next_at is not None:
row["next_attempt_at"] = next_at
rows.append(row)
await session.execute(insert(self._outbox_table), rows)
await self._notify(session, queue)
if next_at is None:
await self._notify(session, queue)

async def cancel_timer(
self,
*,
queue: str,
timer_id: str,
session: AsyncSession,
) -> bool:
"""
Delete a not-yet-leased timer row. Returns True if a row was deleted.

Same transactional contract as :meth:`publish` — runs on the caller's session
and commits with their transaction. The ``acquired_token IS NULL`` guard
prevents canceling a row whose handler is already in flight: that returns
False and the delivery completes normally.
"""
if not isinstance(session, AsyncSession):
msg = "broker.cancel_timer requires an sqlalchemy.ext.asyncio.AsyncSession"
raise TypeError(msg)
t = self._outbox_table
stmt = delete(t).where(
t.c.queue == queue,
t.c.timer_id == timer_id,
t.c.acquired_token.is_(None),
)
result = await session.execute(stmt)
return (result.rowcount or 0) > 0 # ty: ignore[unresolved-attribute]

async def _notify(self, session: AsyncSession, queue: str) -> None:
"""
Expand Down
11 changes: 11 additions & 0 deletions faststream_outbox/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table
Column("last_attempt_at", DateTime(timezone=True), nullable=True),
Column("acquired_at", DateTime(timezone=True), nullable=True),
Column("acquired_token", Uuid, nullable=True),
Column("timer_id", String(255), nullable=True),
)
# Partial index that backs the fetch query's hot branch
# (`WHERE acquired_token IS NULL AND queue = ? AND next_attempt_at <= now()`).
Expand All @@ -63,4 +64,14 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table
table.c.next_attempt_at,
postgresql_where=table.c.acquired_token.is_(None),
)
# Partial unique index that backs `publish(..., timer_id=...)`'s ON CONFLICT DO NOTHING
# and the (queue, timer_id) lookup in `cancel_timer`. Only enforced when timer_id is set,
# so non-timer rows remain unconstrained.
Index(
f"{table_name}_timer_id_uq",
table.c.queue,
table.c.timer_id,
unique=True,
postgresql_where=table.c.timer_id.is_not(None),
)
return table
Loading
Loading