Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr
### Two-loop subscriber (`subscriber/usecase.py`)

Per subscriber:
1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors.
1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_<table>`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration.
2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`.

Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_<table>', queue)` on the caller's session right after the INSERT. NOTIFY is transactional — listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY.

Channel naming convention: `outbox_<table_name>`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling.

### Lease-token invariant — load-bearing

Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and a newer fetch reclaimed the row with a fresh token, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async with session_factory() as session, session.begin():

A subscriber owns two async loops:

1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed.
1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. With the asyncpg driver, the loop also `LISTEN`s on `outbox_<table>` and `publish` emits `pg_notify(...)`, so idle dispatch latency is sub-100ms instead of up to `max_fetch_interval`. Polling stays as the fallback.
2. **workers** (× `max_workers`) — dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`. On failure, the retry strategy decides: schedule another attempt, or terminal `DELETE`.

The `acquired_token` is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal `DELETE`/`UPDATE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row.
Expand Down
20 changes: 19 additions & 1 deletion faststream_outbox/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
from sqlalchemy import insert
from sqlalchemy import insert, text
from sqlalchemy.ext.asyncio import AsyncSession

from faststream_outbox.client import OutboxClient
Expand Down Expand Up @@ -208,6 +208,7 @@ async def publish( # ty: ignore[invalid-method-override]
raise TypeError(msg)
payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id)
await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs))
await self._notify(session, queue)

async def publish_batch( # ty: ignore[invalid-method-override]
self,
Expand All @@ -232,6 +233,23 @@ async def publish_batch( # ty: ignore[invalid-method-override]
payload, hdrs = _encode_payload(body, headers=headers)
rows.append({"queue": queue, "payload": payload, "headers": hdrs})
await session.execute(insert(self._outbox_table), rows)
await self._notify(session, queue)

async def _notify(self, session: AsyncSession, queue: str) -> None:
"""
Emit ``pg_notify('outbox_<table>', queue)`` so listening subscribers wake immediately.

Uses ``pg_notify(...)`` rather than raw ``NOTIFY`` so the channel and payload
bind cleanly as parameters (raw NOTIFY accepts only literals — injection-prone).
Runs on the caller's session so the NOTIFY commits with the row insert; if the
caller's transaction rolls back, the NOTIFY is silently discarded by Postgres.
Safe no-op for non-Postgres dialects: subscribers without a matching LISTEN just
ignore it.
"""
await session.execute(
text("SELECT pg_notify(:channel, :payload)"),
{"channel": f"outbox_{self._outbox_table.name}", "payload": queue},
)

async def request(self, *args: typing.Any, **kwargs: typing.Any) -> typing.NoReturn:
msg = "OutboxBroker does not support request-reply"
Expand Down
60 changes: 50 additions & 10 deletions faststream_outbox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from collections.abc import Sequence

from sqlalchemy import Connection, Table
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine


@dataclass(frozen=True)
Expand All @@ -58,23 +58,34 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None:
def table(self) -> "Table":
return self._table

async def fetch(
@property
def engine(self) -> "AsyncEngine":
"""
The underlying ``AsyncEngine``.

Used by the subscriber loop to open its own long-lived fetch connection and to
drive ``LISTEN/NOTIFY``.
"""
return self._engine

async def fetch_with_conn(
self,
conn: "AsyncConnection",
queues: "Sequence[str]",
*,
limit: int,
lease_ttl_seconds: float,
) -> list[OutboxInnerMessage]:
"""
Atomically claim up to *limit* available rows for the given queue names.
Run the fetch CTE on a caller-supplied connection.

A row is available iff its lease is unset (``acquired_token IS NULL``) or its
lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each
carries ``acquired_token`` which the worker loop must echo back on the terminal
``DELETE``/``UPDATE``.
Same contract as :meth:`fetch`. Used by ``OutboxSubscriber._fetch_loop`` to reuse
a long-lived connection instead of acquiring one per fetch from the pool. Each
call opens its own transaction on *conn* via ``async with conn.begin():``.

Callers must pass a non-empty *queues*; the empty-queue short-circuit lives in
:meth:`fetch`.
"""
if not queues:
return []
token = uuid.uuid4()
t = self._table

Expand Down Expand Up @@ -103,11 +114,40 @@ async def fetch(
)
.returning(*t.c)
)
async with self._engine.begin() as conn:
async with conn.begin():
result = await conn.execute(stmt, {"lease_ttl": max(0.0, lease_ttl_seconds)})
rows = result.mappings().all()
return [_row_to_message(dict(row)) for row in rows]

async def fetch(
self,
queues: "Sequence[str]",
*,
limit: int,
lease_ttl_seconds: float,
) -> list[OutboxInnerMessage]:
"""
Atomically claim up to *limit* available rows for the given queue names.

A row is available iff its lease is unset (``acquired_token IS NULL``) or its
lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each
carries ``acquired_token`` which the worker loop must echo back on the terminal
``DELETE``/``UPDATE``.

Convenience wrapper that opens a one-shot connection and delegates to
:meth:`fetch_with_conn`. The subscriber loop uses ``fetch_with_conn`` directly
with a long-lived connection.
"""
if not queues:
return []
async with self._engine.connect() as conn:
return await self.fetch_with_conn(
conn,
queues,
limit=limit,
lease_ttl_seconds=lease_ttl_seconds,
)

async def delete_with_lease(self, message_id: int, acquired_token: uuid.UUID) -> bool:
"""Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted."""
t = self._table
Expand Down
147 changes: 132 additions & 15 deletions faststream_outbox/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@
* ``_fetch_loop`` claims available rows from Postgres and pushes them onto an
in-process queue. A row is available iff its lease is unset *or* expired
(``acquired_at < now() - lease_ttl_seconds``); the fetch CTE reclaims both cases
in one round-trip. Adaptive idle backoff with jitter when the queue is empty.
in one round-trip. Adaptive idle backoff with jitter when the queue is empty —
but the sleep is short-circuited by ``LISTEN/NOTIFY`` when the asyncpg driver
is in use, dropping idle dispatch latency from up to ``max_fetch_interval`` to
~10ms. Polling stays as the fallback if ``LISTEN`` setup fails.
* ``_worker_loop`` (one per ``max_workers``) pulls rows, dispatches via
``consume()``, and writes the terminal state back through the client. Every
terminal write is filtered by ``acquired_token`` so a row whose lease was
reclaimed by a newer fetch doesn't get clobbered by the stale handler.

The fetch loop owns a long-lived ``AsyncConnection`` (used for the fetch CTE) and,
when asyncpg is available, a separate raw asyncpg connection dedicated to LISTEN.
On any error the connections are closed, the loop backs off, then both are reopened
in the next iteration.
"""

import asyncio
Expand All @@ -31,13 +39,20 @@
from faststream_outbox.subscriber.config import OutboxSubscriberConfig, OutboxSubscriberSpecificationConfig


try:
import asyncpg as _asyncpg
except ImportError: # pragma: no cover
_asyncpg = None # ty: ignore[invalid-assignment]


_BACKOFF_EXP_CAP = 30


if typing.TYPE_CHECKING:
from faststream._internal.endpoint.publisher import PublisherProto
from faststream._internal.endpoint.subscriber.call_item import CallsCollection
from faststream.message import StreamMessage
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from faststream_outbox.client import OutboxClient
from faststream_outbox.configs import OutboxBrokerConfig
Expand Down Expand Up @@ -82,6 +97,10 @@ def __init__(
self._inflight: asyncio.Queue[OutboxInnerMessage] = asyncio.Queue(
maxsize=config.fetch_batch_size,
)
# Set by the LISTEN callback to wake _fetch_loop early; cleared after each
# wakeup. When LISTEN is unavailable the event simply never fires and the
# loop sleeps the full adaptive interval.
self._notify_event: asyncio.Event = asyncio.Event()

@property
def _client(self) -> "OutboxClient":
Expand Down Expand Up @@ -110,39 +129,137 @@ async def stop(self) -> None:
with anyio.move_on_after(self._outer_config.graceful_timeout):
await super().stop()

@property
def _notify_channel(self) -> str:
"""
LISTEN channel name.

One channel per outbox table; subscribers ignore queues they don't care
about (cheap — wake-up does an empty fetch and goes back to sleep).
"""
return f"outbox_{self._client.table.name}"

async def _fetch_loop(self) -> None:
"""
Outer loop: own connection lifecycle, back off and reconnect on error.

When the client has no real engine (test broker), drives the inner loop without
a persistent connection — uses ``client.fetch(...)`` for each iteration and never
sets up LISTEN. The ``_notify_event`` simply never fires; behavior is polling-only.
"""
error_attempt = 0
while self.running:
# Read client lazily inside the loop: in the test broker path the client is
# patched in/out via mock.patch, so it can be None after teardown. Returning
# cleanly (rather than raising RuntimeError) prevents FastStream's supervisor
# from restarting the task and leaking a pending coroutine at GC time.
client = self._outer_config.client
if client is None:
return
engine = client.engine
try:
if engine is None:
await self._fetch_inner(fetch_conn=None)
else:
async with engine.connect() as fetch_conn:
listen_conn = await self._open_listen_connection(engine)
try:
await self._fetch_inner(fetch_conn=fetch_conn)
finally:
if listen_conn is not None:
await listen_conn.close()
except Exception as e: # noqa: BLE001
self._log(
log_level=logging.ERROR,
message=f"Outbox fetch loop error: {e!r}; reconnecting",
exc_info=e,
)
error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP)
delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311
await anyio.sleep(delay)

async def _fetch_inner(self, *, fetch_conn: "AsyncConnection | None") -> None:
"""
Fetch + adaptive backoff, with NOTIFY-driven wakeup.

Returns when ``self.running`` goes False, or raises on any DB error so the outer
loop can rebuild the connection.
"""
base = self._config.min_fetch_interval
max_idle = self._config.max_fetch_interval
idle_count = 0
error_attempt = 0

while self.running:
free = self._inflight.maxsize - self._inflight.qsize()
if free <= 0:
await anyio.sleep(base)
await self._wait_for_notify_or_timeout(base)
continue
try:
limit = min(free, self._config.fetch_batch_size)
if fetch_conn is None:
rows = await self._client.fetch(
self._queues,
limit=min(free, self._config.fetch_batch_size),
limit=limit,
lease_ttl_seconds=self._config.lease_ttl_seconds,
)
else:
rows = await self._client.fetch_with_conn(
fetch_conn,
self._queues,
limit=limit,
lease_ttl_seconds=self._config.lease_ttl_seconds,
)
except Exception as e: # noqa: BLE001
self._log(log_level=logging.ERROR, message=f"Outbox fetch error: {e!r}", exc_info=e)
error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP)
delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311
await anyio.sleep(delay)
continue # pragma: no cover -- coverage misses bare-continue-after-await

error_attempt = 0
if rows:
idle_count = 0
for row in rows:
await self._inflight.put(row)
else:
idle_count = min(idle_count + 1, _BACKOFF_EXP_CAP)
delay = min(base * (2.0 ** (idle_count - 1)) * random.uniform(0.5, 1.5), max_idle) # noqa: S311
await anyio.sleep(delay)
await self._wait_for_notify_or_timeout(delay)

async def _wait_for_notify_or_timeout(self, timeout: float) -> None: # noqa: ASYNC109
"""Sleep up to *timeout* seconds, but wake immediately on a NOTIFY."""
with suppress(TimeoutError):
await asyncio.wait_for(self._notify_event.wait(), timeout=timeout)
self._notify_event.clear()

async def _open_listen_connection(self, engine: "AsyncEngine") -> "_asyncpg.Connection | None":
"""
Open a dedicated raw asyncpg connection and register LISTEN on it.

Returns the connection on success, ``None`` on any failure (asyncpg not installed,
non-asyncpg driver, permission error, network problem). The fetch loop falls back
to polling-only behavior in that case.

A separate connection is required because asyncpg's ``add_listener`` makes the
connection's reader task monopolize it — interleaving normal queries breaks
notification delivery.
"""
if _asyncpg is None or "asyncpg" not in (engine.url.drivername or ""):
return None
# SQLAlchemy URL with the +asyncpg suffix isn't a valid raw asyncpg DSN; strip it.
# ``str(url)`` hides the password — use ``render_as_string(hide_password=False)``
# so asyncpg.connect actually sees the credentials.
dsn = engine.url.set(drivername="postgresql").render_as_string(hide_password=False)
try:
conn = await _asyncpg.connect(dsn)
await conn.add_listener(self._notify_channel, self._on_notify)
except Exception as e: # noqa: BLE001
self._log(
log_level=logging.WARNING,
message=f"LISTEN setup failed; falling back to polling: {e!r}",
exc_info=e,
)
return None
return conn

def _on_notify(self, *_args: object) -> None:
"""
Asyncpg notification callback: ``(connection, pid, channel, payload)``.

We only need the wake-up signal; payload is ignored. Setting an ``asyncio.Event``
from the asyncpg reader task is safe — it runs on the same event loop.
"""
self._notify_event.set()

async def _worker_loop(self) -> None:
logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None
Expand Down
16 changes: 16 additions & 0 deletions faststream_outbox/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ def rows(self) -> list[_FakeRow]:
def table(self) -> typing.Any:
return None

@property
def engine(self) -> None:
"""No real engine — signals the subscriber loop to use the polling-only path."""
return None

async def fetch_with_conn(
self,
conn: typing.Any, # noqa: ARG002
queues: "Sequence[str]",
*,
limit: int,
lease_ttl_seconds: float,
) -> list[OutboxInnerMessage]:
"""Mirror :meth:`OutboxClient.fetch_with_conn`; *conn* is ignored by the fake."""
return await self.fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds)

async def fetch(
self,
queues: "Sequence[str]",
Expand Down
Loading
Loading