diff --git a/CLAUDE.md b/CLAUDE.md
index ae992a3..c5ec16c 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -35,9 +35,13 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr
### Two-loop subscriber (`subscriber/usecase.py`)
Per subscriber:
-1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors.
+1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_
`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration.
2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`.
+Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_', queue)` on the caller's session right after the INSERT. NOTIFY is transactional — listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY.
+
+Channel naming convention: `outbox_`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling.
+
### Lease-token invariant — load-bearing
Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and a newer fetch reclaimed the row with a fresh token, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this.
diff --git a/README.md b/README.md
index 85e0be6..12e2292 100644
--- a/README.md
+++ b/README.md
@@ -42,7 +42,7 @@ async with session_factory() as session, session.begin():
A subscriber owns two async loops:
-1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed.
+1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. With the asyncpg driver, the loop also `LISTEN`s on `outbox_` and `publish` emits `pg_notify(...)`, so idle dispatch latency is sub-100ms instead of up to `max_fetch_interval`. Polling stays as the fallback.
2. **workers** (× `max_workers`) — dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`. On failure, the retry strategy decides: schedule another attempt, or terminal `DELETE`.
The `acquired_token` is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal `DELETE`/`UPDATE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row.
diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py
index df6700b..5c176e1 100644
--- a/faststream_outbox/broker.py
+++ b/faststream_outbox/broker.py
@@ -22,7 +22,7 @@
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
-from sqlalchemy import insert
+from sqlalchemy import insert, text
from sqlalchemy.ext.asyncio import AsyncSession
from faststream_outbox.client import OutboxClient
@@ -208,6 +208,7 @@ async def publish( # ty: ignore[invalid-method-override]
raise TypeError(msg)
payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id)
await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs))
+ await self._notify(session, queue)
async def publish_batch( # ty: ignore[invalid-method-override]
self,
@@ -232,6 +233,23 @@ async def publish_batch( # ty: ignore[invalid-method-override]
payload, hdrs = _encode_payload(body, headers=headers)
rows.append({"queue": queue, "payload": payload, "headers": hdrs})
await session.execute(insert(self._outbox_table), rows)
+ await self._notify(session, queue)
+
+ async def _notify(self, session: AsyncSession, queue: str) -> None:
+ """
+ Emit ``pg_notify('outbox_', queue)`` so listening subscribers wake immediately.
+
+ Uses ``pg_notify(...)`` rather than raw ``NOTIFY`` so the channel and payload
+ bind cleanly as parameters (raw NOTIFY accepts only literals — injection-prone).
+ Runs on the caller's session so the NOTIFY commits with the row insert; if the
+ caller's transaction rolls back, the NOTIFY is silently discarded by Postgres.
+ Safe no-op for non-Postgres dialects: subscribers without a matching LISTEN just
+ ignore it.
+ """
+ await session.execute(
+ text("SELECT pg_notify(:channel, :payload)"),
+ {"channel": f"outbox_{self._outbox_table.name}", "payload": queue},
+ )
async def request(self, *args: typing.Any, **kwargs: typing.Any) -> typing.NoReturn:
msg = "OutboxBroker does not support request-reply"
diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py
index b6b1f31..d001631 100644
--- a/faststream_outbox/client.py
+++ b/faststream_outbox/client.py
@@ -39,7 +39,7 @@
from collections.abc import Sequence
from sqlalchemy import Connection, Table
- from sqlalchemy.ext.asyncio import AsyncEngine
+ from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
@dataclass(frozen=True)
@@ -58,23 +58,34 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None:
def table(self) -> "Table":
return self._table
- async def fetch(
+ @property
+ def engine(self) -> "AsyncEngine":
+ """
+ The underlying ``AsyncEngine``.
+
+ Used by the subscriber loop to open its own long-lived fetch connection and to
+ drive ``LISTEN/NOTIFY``.
+ """
+ return self._engine
+
+ async def fetch_with_conn(
self,
+ conn: "AsyncConnection",
queues: "Sequence[str]",
*,
limit: int,
lease_ttl_seconds: float,
) -> list[OutboxInnerMessage]:
"""
- Atomically claim up to *limit* available rows for the given queue names.
+ Run the fetch CTE on a caller-supplied connection.
- A row is available iff its lease is unset (``acquired_token IS NULL``) or its
- lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each
- carries ``acquired_token`` which the worker loop must echo back on the terminal
- ``DELETE``/``UPDATE``.
+ Same contract as :meth:`fetch`. Used by ``OutboxSubscriber._fetch_loop`` to reuse
+ a long-lived connection instead of acquiring one per fetch from the pool. Each
+ call opens its own transaction on *conn* via ``async with conn.begin():``.
+
+ Callers must pass a non-empty *queues*; the empty-queue short-circuit lives in
+ :meth:`fetch`.
"""
- if not queues:
- return []
token = uuid.uuid4()
t = self._table
@@ -103,11 +114,40 @@ async def fetch(
)
.returning(*t.c)
)
- async with self._engine.begin() as conn:
+ async with conn.begin():
result = await conn.execute(stmt, {"lease_ttl": max(0.0, lease_ttl_seconds)})
rows = result.mappings().all()
return [_row_to_message(dict(row)) for row in rows]
+ async def fetch(
+ self,
+ queues: "Sequence[str]",
+ *,
+ limit: int,
+ lease_ttl_seconds: float,
+ ) -> list[OutboxInnerMessage]:
+ """
+ Atomically claim up to *limit* available rows for the given queue names.
+
+ A row is available iff its lease is unset (``acquired_token IS NULL``) or its
+ lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each
+ carries ``acquired_token`` which the worker loop must echo back on the terminal
+ ``DELETE``/``UPDATE``.
+
+ Convenience wrapper that opens a one-shot connection and delegates to
+ :meth:`fetch_with_conn`. The subscriber loop uses ``fetch_with_conn`` directly
+ with a long-lived connection.
+ """
+ if not queues:
+ return []
+ async with self._engine.connect() as conn:
+ return await self.fetch_with_conn(
+ conn,
+ queues,
+ limit=limit,
+ lease_ttl_seconds=lease_ttl_seconds,
+ )
+
async def delete_with_lease(self, message_id: int, acquired_token: uuid.UUID) -> bool:
"""Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted."""
t = self._table
diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py
index 6fe8833..58fa725 100644
--- a/faststream_outbox/subscriber/usecase.py
+++ b/faststream_outbox/subscriber/usecase.py
@@ -6,11 +6,19 @@
* ``_fetch_loop`` claims available rows from Postgres and pushes them onto an
in-process queue. A row is available iff its lease is unset *or* expired
(``acquired_at < now() - lease_ttl_seconds``); the fetch CTE reclaims both cases
- in one round-trip. Adaptive idle backoff with jitter when the queue is empty.
+ in one round-trip. Adaptive idle backoff with jitter when the queue is empty —
+ but the sleep is short-circuited by ``LISTEN/NOTIFY`` when the asyncpg driver
+ is in use, dropping idle dispatch latency from up to ``max_fetch_interval`` to
+ ~10ms. Polling stays as the fallback if ``LISTEN`` setup fails.
* ``_worker_loop`` (one per ``max_workers``) pulls rows, dispatches via
``consume()``, and writes the terminal state back through the client. Every
terminal write is filtered by ``acquired_token`` so a row whose lease was
reclaimed by a newer fetch doesn't get clobbered by the stale handler.
+
+The fetch loop owns a long-lived ``AsyncConnection`` (used for the fetch CTE) and,
+when asyncpg is available, a separate raw asyncpg connection dedicated to LISTEN.
+On any error the connections are closed, the loop backs off, then both are reopened
+in the next iteration.
"""
import asyncio
@@ -31,6 +39,12 @@
from faststream_outbox.subscriber.config import OutboxSubscriberConfig, OutboxSubscriberSpecificationConfig
+try:
+ import asyncpg as _asyncpg
+except ImportError: # pragma: no cover
+ _asyncpg = None # ty: ignore[invalid-assignment]
+
+
_BACKOFF_EXP_CAP = 30
@@ -38,6 +52,7 @@
from faststream._internal.endpoint.publisher import PublisherProto
from faststream._internal.endpoint.subscriber.call_item import CallsCollection
from faststream.message import StreamMessage
+ from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
from faststream_outbox.client import OutboxClient
from faststream_outbox.configs import OutboxBrokerConfig
@@ -82,6 +97,10 @@ def __init__(
self._inflight: asyncio.Queue[OutboxInnerMessage] = asyncio.Queue(
maxsize=config.fetch_batch_size,
)
+ # Set by the LISTEN callback to wake _fetch_loop early; cleared after each
+ # wakeup. When LISTEN is unavailable the event simply never fires and the
+ # loop sleeps the full adaptive interval.
+ self._notify_event: asyncio.Event = asyncio.Event()
@property
def _client(self) -> "OutboxClient":
@@ -110,31 +129,84 @@ async def stop(self) -> None:
with anyio.move_on_after(self._outer_config.graceful_timeout):
await super().stop()
+ @property
+ def _notify_channel(self) -> str:
+ """
+ LISTEN channel name.
+
+ One channel per outbox table; subscribers ignore queues they don't care
+ about (cheap — wake-up does an empty fetch and goes back to sleep).
+ """
+ return f"outbox_{self._client.table.name}"
+
async def _fetch_loop(self) -> None:
+ """
+ Outer loop: own connection lifecycle, back off and reconnect on error.
+
+ When the client has no real engine (test broker), drives the inner loop without
+ a persistent connection — uses ``client.fetch(...)`` for each iteration and never
+ sets up LISTEN. The ``_notify_event`` simply never fires; behavior is polling-only.
+ """
+ error_attempt = 0
+ while self.running:
+ # Read client lazily inside the loop: in the test broker path the client is
+ # patched in/out via mock.patch, so it can be None after teardown. Returning
+ # cleanly (rather than raising RuntimeError) prevents FastStream's supervisor
+ # from restarting the task and leaking a pending coroutine at GC time.
+ client = self._outer_config.client
+ if client is None:
+ return
+ engine = client.engine
+ try:
+ if engine is None:
+ await self._fetch_inner(fetch_conn=None)
+ else:
+ async with engine.connect() as fetch_conn:
+ listen_conn = await self._open_listen_connection(engine)
+ try:
+ await self._fetch_inner(fetch_conn=fetch_conn)
+ finally:
+ if listen_conn is not None:
+ await listen_conn.close()
+ except Exception as e: # noqa: BLE001
+ self._log(
+ log_level=logging.ERROR,
+ message=f"Outbox fetch loop error: {e!r}; reconnecting",
+ exc_info=e,
+ )
+ error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP)
+ delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311
+ await anyio.sleep(delay)
+
+ async def _fetch_inner(self, *, fetch_conn: "AsyncConnection | None") -> None:
+ """
+ Fetch + adaptive backoff, with NOTIFY-driven wakeup.
+
+ Returns when ``self.running`` goes False, or raises on any DB error so the outer
+ loop can rebuild the connection.
+ """
base = self._config.min_fetch_interval
max_idle = self._config.max_fetch_interval
idle_count = 0
- error_attempt = 0
-
while self.running:
free = self._inflight.maxsize - self._inflight.qsize()
if free <= 0:
- await anyio.sleep(base)
+ await self._wait_for_notify_or_timeout(base)
continue
- try:
+ limit = min(free, self._config.fetch_batch_size)
+ if fetch_conn is None:
rows = await self._client.fetch(
self._queues,
- limit=min(free, self._config.fetch_batch_size),
+ limit=limit,
+ lease_ttl_seconds=self._config.lease_ttl_seconds,
+ )
+ else:
+ rows = await self._client.fetch_with_conn(
+ fetch_conn,
+ self._queues,
+ limit=limit,
lease_ttl_seconds=self._config.lease_ttl_seconds,
)
- except Exception as e: # noqa: BLE001
- self._log(log_level=logging.ERROR, message=f"Outbox fetch error: {e!r}", exc_info=e)
- error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP)
- delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311
- await anyio.sleep(delay)
- continue # pragma: no cover -- coverage misses bare-continue-after-await
-
- error_attempt = 0
if rows:
idle_count = 0
for row in rows:
@@ -142,7 +214,52 @@ async def _fetch_loop(self) -> None:
else:
idle_count = min(idle_count + 1, _BACKOFF_EXP_CAP)
delay = min(base * (2.0 ** (idle_count - 1)) * random.uniform(0.5, 1.5), max_idle) # noqa: S311
- await anyio.sleep(delay)
+ await self._wait_for_notify_or_timeout(delay)
+
+ async def _wait_for_notify_or_timeout(self, timeout: float) -> None: # noqa: ASYNC109
+ """Sleep up to *timeout* seconds, but wake immediately on a NOTIFY."""
+ with suppress(TimeoutError):
+ await asyncio.wait_for(self._notify_event.wait(), timeout=timeout)
+ self._notify_event.clear()
+
+ async def _open_listen_connection(self, engine: "AsyncEngine") -> "_asyncpg.Connection | None":
+ """
+ Open a dedicated raw asyncpg connection and register LISTEN on it.
+
+ Returns the connection on success, ``None`` on any failure (asyncpg not installed,
+ non-asyncpg driver, permission error, network problem). The fetch loop falls back
+ to polling-only behavior in that case.
+
+ A separate connection is required because asyncpg's ``add_listener`` makes the
+ connection's reader task monopolize it — interleaving normal queries breaks
+ notification delivery.
+ """
+ if _asyncpg is None or "asyncpg" not in (engine.url.drivername or ""):
+ return None
+ # SQLAlchemy URL with the +asyncpg suffix isn't a valid raw asyncpg DSN; strip it.
+ # ``str(url)`` hides the password — use ``render_as_string(hide_password=False)``
+ # so asyncpg.connect actually sees the credentials.
+ dsn = engine.url.set(drivername="postgresql").render_as_string(hide_password=False)
+ try:
+ conn = await _asyncpg.connect(dsn)
+ await conn.add_listener(self._notify_channel, self._on_notify)
+ except Exception as e: # noqa: BLE001
+ self._log(
+ log_level=logging.WARNING,
+ message=f"LISTEN setup failed; falling back to polling: {e!r}",
+ exc_info=e,
+ )
+ return None
+ return conn
+
+ def _on_notify(self, *_args: object) -> None:
+ """
+ Asyncpg notification callback: ``(connection, pid, channel, payload)``.
+
+ We only need the wake-up signal; payload is ignored. Setting an ``asyncio.Event``
+ from the asyncpg reader task is safe — it runs on the same event loop.
+ """
+ self._notify_event.set()
async def _worker_loop(self) -> None:
logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None
diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py
index 9ec399f..68f9f70 100644
--- a/faststream_outbox/testing.py
+++ b/faststream_outbox/testing.py
@@ -79,6 +79,22 @@ def rows(self) -> list[_FakeRow]:
def table(self) -> typing.Any:
return None
+ @property
+ def engine(self) -> None:
+ """No real engine — signals the subscriber loop to use the polling-only path."""
+ return None
+
+ async def fetch_with_conn(
+ self,
+ conn: typing.Any, # noqa: ARG002
+ queues: "Sequence[str]",
+ *,
+ limit: int,
+ lease_ttl_seconds: float,
+ ) -> list[OutboxInnerMessage]:
+ """Mirror :meth:`OutboxClient.fetch_with_conn`; *conn* is ignored by the fake."""
+ return await self.fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds)
+
async def fetch(
self,
queues: "Sequence[str]",
diff --git a/tests/test_fake.py b/tests/test_fake.py
index fe4db9a..a061820 100644
--- a/tests/test_fake.py
+++ b/tests/test_fake.py
@@ -15,7 +15,7 @@
make_outbox_table,
)
from faststream_outbox.envelope import _encode_payload as encode_payload
-from faststream_outbox.testing import _FakeRow
+from faststream_outbox.testing import FakeOutboxClient, _FakeRow
def _make_broker() -> OutboxBroker:
@@ -580,3 +580,14 @@ async def handle(body: str) -> None:
await _wait_until(lambda: received, timeout=3.0)
assert received == ["via-router"]
+
+
+async def test_fake_client_fetch_with_conn_mirrors_fetch() -> None:
+ fake = FakeOutboxClient()
+ fake.feed(queue="q", payload=b"x")
+
+ rows = await fake.fetch_with_conn(None, ["q"], limit=10, lease_ttl_seconds=60.0)
+
+ assert len(rows) == 1
+ assert rows[0].queue == "q"
+ assert rows[0].payload == b"x"
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 71a2a8d..208a778 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -3,6 +3,7 @@
import asyncio
import datetime as _dt
import uuid
+from unittest import mock
import pytest
from sqlalchemy import insert, select, text
@@ -307,3 +308,88 @@ async def test_publish_batch_inserts_all_rows(pg_engine, outbox_table) -> None:
)
assert await _row_count(pg_engine, outbox_table) == 3
+
+
+async def test_notify_wakes_subscriber_well_before_polling_interval(pg_engine, outbox_table) -> None:
+ """LISTEN/NOTIFY must dispatch a freshly-published row long before the polling sleep elapses."""
+ received: list[dict] = []
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+
+ # Polling sleep ceiling is 10s; if NOTIFY works, dispatch happens in <500ms.
+ @broker.subscriber("orders", min_fetch_interval=10.0, max_fetch_interval=10.0)
+ async def handle(body: dict) -> None:
+ received.append(body)
+
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with broker:
+ # Let the subscriber settle into its idle wait so we know it's blocked on the
+ # polling sleep when NOTIFY arrives — proves the wake-up actually shortcuts.
+ await asyncio.sleep(0.5)
+ async with session_factory() as session, session.begin():
+ await broker.publish({"order_id": 1}, queue="orders", session=session)
+ # If NOTIFY wakeup works, this returns in tens of milliseconds. If it doesn't,
+ # this would block for ~10s. A 2s budget cleanly distinguishes the two.
+ await _wait_until(lambda: received, timeout=2.0)
+
+ assert received == [{"order_id": 1}]
+
+
+async def test_fetch_uses_persistent_connection(pg_engine, outbox_table) -> None:
+ """Every fetch must land on the same backend pid — proves the connection is reused."""
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ received: list[dict] = []
+ fetch_pids: list[int] = []
+ original_fetch_with_conn = OutboxClient.fetch_with_conn
+
+ async def tracking_fetch_with_conn(self, conn, queues, *, limit, lease_ttl_seconds):
+ # Probe the pid in its own transaction so SQLAlchemy's autobegun txn is
+ # closed before original_fetch_with_conn opens its own ``async with conn.begin():``.
+ async with conn.begin():
+ result = await conn.execute(text("SELECT pg_backend_pid()"))
+ fetch_pids.append(result.scalar_one())
+ return await original_fetch_with_conn(self, conn, queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds)
+
+ @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
+ async def handle(body: dict) -> None:
+ received.append(body)
+
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ with mock.patch.object(OutboxClient, "fetch_with_conn", tracking_fetch_with_conn):
+ async with broker:
+ for i in range(5):
+ async with session_factory() as session, session.begin():
+ await broker.publish({"i": i}, queue="orders", session=session)
+ await _wait_until(lambda: len(received) == 5, timeout=5.0)
+
+ assert len(fetch_pids) >= 3, f"fetch_with_conn should run multiple times, got {len(fetch_pids)}"
+ assert len(set(fetch_pids)) == 1, f"persistent connection should hold one pid, saw {set(fetch_pids)}"
+
+
+async def test_notify_payload_carries_queue_name(pg_engine, outbox_table) -> None:
+ """The NOTIFY payload Postgres delivers to LISTEN clients must equal the queue name."""
+ received_payloads: list[str] = []
+ received_event = asyncio.Event()
+
+ # Open a raw asyncpg listener on the same channel the broker NOTIFYs on.
+ import asyncpg # noqa: PLC0415
+
+ dsn = pg_engine.url.set(drivername="postgresql").render_as_string(hide_password=False)
+ listener = await asyncpg.connect(dsn)
+ try:
+
+ def _cb(_conn, _pid, _channel, payload) -> None:
+ received_payloads.append(payload)
+ received_event.set()
+
+ await listener.add_listener(f"outbox_{outbox_table.name}", _cb)
+
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ await broker.publish({"x": 1}, queue="orders", session=session)
+ # Wait for the NOTIFY to land on our listener
+ await asyncio.wait_for(received_event.wait(), timeout=2.0)
+ finally:
+ await listener.close()
+
+ assert received_payloads == ["orders"]
diff --git a/tests/test_unit.py b/tests/test_unit.py
index 843131b..19b390e 100644
--- a/tests/test_unit.py
+++ b/tests/test_unit.py
@@ -1,6 +1,6 @@
import datetime as _dt
import uuid
-from unittest.mock import AsyncMock
+from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from sqlalchemy import MetaData
@@ -18,6 +18,7 @@
from faststream_outbox.envelope import _encode_payload
from faststream_outbox.message import OutboxInnerMessage, OutboxMessage
from faststream_outbox.parser.parser import OutboxParser
+from faststream_outbox.subscriber.usecase import OutboxSubscriber
def _make_broker(engine: object | None = None, table_name: str = "outbox") -> OutboxBroker:
@@ -274,19 +275,24 @@ async def test_broker_publish_rejects_non_async_session() -> None:
await broker.publish(b"x", queue="orders", session=object()) # ty: ignore[invalid-argument-type]
-async def test_broker_publish_executes_insert_on_session() -> None:
+async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> None:
from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415
broker = _make_broker()
session = AsyncMock(spec=AsyncSession)
await broker.publish({"order_id": 1}, queue="orders", session=session)
- session.execute.assert_awaited_once()
- stmt = session.execute.await_args.args[0]
- assert "INSERT INTO" in str(stmt)
- params = stmt.compile().params
+ # Two execute calls: the INSERT, then SELECT pg_notify(...).
+ assert session.execute.await_count == 2
+ insert_stmt = session.execute.await_args_list[0].args[0]
+ assert "INSERT INTO" in str(insert_stmt)
+ params = insert_stmt.compile().params
assert params["queue"] == "orders"
assert params["payload"] == b'{"order_id": 1}'
assert params["headers"]["content-type"] == "application/json"
+ notify_stmt, notify_params = session.execute.await_args_list[1].args
+ assert "pg_notify" in str(notify_stmt)
+ assert notify_params["channel"] == "outbox_outbox"
+ assert notify_params["payload"] == "orders"
async def test_broker_publish_does_not_commit() -> None:
@@ -373,11 +379,15 @@ async def test_broker_publish_batch_executes_single_insert_for_many_rows() -> No
broker = _make_broker()
session = AsyncMock(spec=AsyncSession)
await broker.publish_batch(b"a", b"b", b"c", queue="orders", session=session)
- session.execute.assert_awaited_once()
- rows = session.execute.await_args.args[1]
+ # Two execute calls: the multi-row INSERT, then SELECT pg_notify(...).
+ assert session.execute.await_count == 2
+ rows = session.execute.await_args_list[0].args[1]
assert len(rows) == 3
assert all(r["queue"] == "orders" for r in rows)
assert {r["payload"] for r in rows} == {b"a", b"b", b"c"}
+ notify_stmt, notify_params = session.execute.await_args_list[1].args
+ assert "pg_notify" in str(notify_stmt)
+ assert notify_params["payload"] == "orders"
async def test_no_producer_methods_raise() -> None:
@@ -658,8 +668,6 @@ async def test_outbox_broker_config_connect_disconnect_noop() -> None:
async def test_subscriber_get_one_raises() -> None:
- from unittest.mock import MagicMock # noqa: PLC0415
-
metadata = MetaData()
t = make_outbox_table(metadata)
broker = OutboxBroker(outbox_table=t)
@@ -672,3 +680,45 @@ async def handle(body: str) -> None: ...
await sub.get_one()
# _make_response_publisher returns ()
assert sub._make_response_publisher(MagicMock()) == () # noqa: SLF001
+
+
+# --- _open_listen_connection fallback paths ---
+
+
+def _make_subscriber_for_listener_test() -> OutboxSubscriber:
+ broker = _make_broker()
+
+ @broker.subscriber("orders")
+ async def handle(body: dict) -> None: ...
+
+ return next(iter(broker._subscribers)) # noqa: SLF001
+
+
+async def test_open_listen_connection_returns_none_for_non_asyncpg_driver() -> None:
+ sub = _make_subscriber_for_listener_test()
+ engine = MagicMock()
+ engine.url.drivername = "postgresql" # no +asyncpg suffix
+
+ result = await sub._open_listen_connection(engine) # noqa: SLF001
+
+ assert result is None
+
+
+async def test_open_listen_connection_returns_none_when_asyncpg_connect_fails() -> None:
+ sub = _make_subscriber_for_listener_test()
+ engine = MagicMock()
+ engine.url.drivername = "postgresql+asyncpg"
+ engine.url.set.return_value.render_as_string.return_value = "postgresql://u:p@h/db"
+
+ with (
+ patch.object(sub, "_log") as log_mock,
+ patch(
+ "faststream_outbox.subscriber.usecase._asyncpg.connect",
+ new=AsyncMock(side_effect=OSError("boom")),
+ ),
+ ):
+ result = await sub._open_listen_connection(engine) # noqa: SLF001
+
+ assert result is None
+ log_mock.assert_called_once()
+ assert "LISTEN setup failed" in log_mock.call_args.kwargs["message"]