From 296781b94771c0d7ade5324fd947a349efb5ebad Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 14:07:54 +0300 Subject: [PATCH 1/3] feat: persistent fetch connection + LISTEN/NOTIFY wakeups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two perf items collapse into one architectural change. The subscriber's fetch loop now owns: 1. A long-lived AsyncConnection used for the fetch CTE — replaces the per-call ``async with engine.begin()`` pool checkout. 2. A separate raw asyncpg connection running ``LISTEN outbox_``, driven via asyncpg's native ``add_listener``. A dedicated connection is required because asyncpg's listener task monopolizes its socket; mixing transactional queries on the same connection would break notification delivery. ``broker.publish`` and ``publish_batch`` now emit ``SELECT pg_notify( 'outbox_
', queue)`` after the INSERT, on the caller's session. NOTIFY is transactional in Postgres, so the listener only sees it after the user's transaction commits — atomicity with the row insert is automatic, and rollbacks silently drop the NOTIFY. The fetch loop's idle-sleep is short-circuited by an asyncio.Event the LISTEN callback sets. Idle dispatch latency drops from up to ``max_fetch_interval`` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error, network blip), the subscriber logs once and degrades gracefully to today's polling behavior. On any DB error inside the fetch loop, both connections are closed, the loop backs off exponentially, then both reopen. Test broker (FakeOutboxClient) signals "no real engine" by returning None from ``client.engine``; the subscriber takes a polling-only path and never opens persistent connections or LISTENs. Existing fake tests are untouched. New integration tests cover NOTIFY-driven wakeup, NOTIFY payload contents, and persistent-connection reuse. Pool sizing implication for users: each active subscriber holds 2 connections (1 from the SQLAlchemy pool, 1 raw asyncpg), so size ``pool_size >= num_subscribers + max_workers`` (raw asyncpg conn is outside the pool). --- CLAUDE.md | 6 +- README.md | 2 +- faststream_outbox/broker.py | 20 +++- faststream_outbox/client.py | 55 +++++++-- faststream_outbox/subscriber/usecase.py | 144 +++++++++++++++++++++--- faststream_outbox/testing.py | 16 +++ tests/test_integration.py | 92 +++++++++++++++ tests/test_unit.py | 23 ++-- 8 files changed, 325 insertions(+), 33 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index ae992a3..c5ec16c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -35,9 +35,13 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr ### Two-loop subscriber (`subscriber/usecase.py`) Per subscriber: -1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors. +1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_
`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration. 2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`. +Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_
', queue)` on the caller's session right after the INSERT. NOTIFY is transactional — listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY. + +Channel naming convention: `outbox_`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling. + ### Lease-token invariant — load-bearing Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and a newer fetch reclaimed the row with a fresh token, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this. diff --git a/README.md b/README.md index 85e0be6..12e2292 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ async with session_factory() as session, session.begin(): A subscriber owns two async loops: -1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. +1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. With the asyncpg driver, the loop also `LISTEN`s on `outbox_
` and `publish` emits `pg_notify(...)`, so idle dispatch latency is sub-100ms instead of up to `max_fetch_interval`. Polling stays as the fallback. 2. **workers** (× `max_workers`) — dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`. On failure, the retry strategy decides: schedule another attempt, or terminal `DELETE`. The `acquired_token` is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal `DELETE`/`UPDATE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row. diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index df6700b..5c176e1 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -22,7 +22,7 @@ from faststream._internal.types import BrokerMiddleware, CustomCallable from faststream.specification.schema import BrokerSpec from faststream.specification.schema.extra import Tag, TagDict -from sqlalchemy import insert +from sqlalchemy import insert, text from sqlalchemy.ext.asyncio import AsyncSession from faststream_outbox.client import OutboxClient @@ -208,6 +208,7 @@ async def publish( # ty: ignore[invalid-method-override] raise TypeError(msg) payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id) await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs)) + await self._notify(session, queue) async def publish_batch( # ty: ignore[invalid-method-override] self, @@ -232,6 +233,23 @@ async def publish_batch( # ty: ignore[invalid-method-override] payload, hdrs = _encode_payload(body, headers=headers) rows.append({"queue": queue, "payload": payload, "headers": hdrs}) await session.execute(insert(self._outbox_table), rows) + await self._notify(session, queue) + + async def _notify(self, session: AsyncSession, queue: str) -> None: + """ + Emit ``pg_notify('outbox_
', queue)`` so listening subscribers wake immediately. + + Uses ``pg_notify(...)`` rather than raw ``NOTIFY`` so the channel and payload + bind cleanly as parameters (raw NOTIFY accepts only literals — injection-prone). + Runs on the caller's session so the NOTIFY commits with the row insert; if the + caller's transaction rolls back, the NOTIFY is silently discarded by Postgres. + Safe no-op for non-Postgres dialects: subscribers without a matching LISTEN just + ignore it. + """ + await session.execute( + text("SELECT pg_notify(:channel, :payload)"), + {"channel": f"outbox_{self._outbox_table.name}", "payload": queue}, + ) async def request(self, *args: typing.Any, **kwargs: typing.Any) -> typing.NoReturn: msg = "OutboxBroker does not support request-reply" diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index b6b1f31..7c18d8e 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -39,7 +39,7 @@ from collections.abc import Sequence from sqlalchemy import Connection, Table - from sqlalchemy.ext.asyncio import AsyncEngine + from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine @dataclass(frozen=True) @@ -58,20 +58,30 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None: def table(self) -> "Table": return self._table - async def fetch( + @property + def engine(self) -> "AsyncEngine": + """ + The underlying ``AsyncEngine``. + + Used by the subscriber loop to open its own long-lived fetch connection and to + drive ``LISTEN/NOTIFY``. + """ + return self._engine + + async def fetch_with_conn( self, + conn: "AsyncConnection", queues: "Sequence[str]", *, limit: int, lease_ttl_seconds: float, ) -> list[OutboxInnerMessage]: """ - Atomically claim up to *limit* available rows for the given queue names. + Run the fetch CTE on a caller-supplied connection. - A row is available iff its lease is unset (``acquired_token IS NULL``) or its - lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each - carries ``acquired_token`` which the worker loop must echo back on the terminal - ``DELETE``/``UPDATE``. + Same contract as :meth:`fetch`. Used by ``OutboxSubscriber._fetch_loop`` to reuse + a long-lived connection instead of acquiring one per fetch from the pool. Each + call opens its own transaction on *conn* via ``async with conn.begin():``. """ if not queues: return [] @@ -103,11 +113,40 @@ async def fetch( ) .returning(*t.c) ) - async with self._engine.begin() as conn: + async with conn.begin(): result = await conn.execute(stmt, {"lease_ttl": max(0.0, lease_ttl_seconds)}) rows = result.mappings().all() return [_row_to_message(dict(row)) for row in rows] + async def fetch( + self, + queues: "Sequence[str]", + *, + limit: int, + lease_ttl_seconds: float, + ) -> list[OutboxInnerMessage]: + """ + Atomically claim up to *limit* available rows for the given queue names. + + A row is available iff its lease is unset (``acquired_token IS NULL``) or its + lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each + carries ``acquired_token`` which the worker loop must echo back on the terminal + ``DELETE``/``UPDATE``. + + Convenience wrapper that opens a one-shot connection and delegates to + :meth:`fetch_with_conn`. The subscriber loop uses ``fetch_with_conn`` directly + with a long-lived connection. + """ + if not queues: + return [] + async with self._engine.connect() as conn: + return await self.fetch_with_conn( + conn, + queues, + limit=limit, + lease_ttl_seconds=lease_ttl_seconds, + ) + async def delete_with_lease(self, message_id: int, acquired_token: uuid.UUID) -> bool: """Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted.""" t = self._table diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 6fe8833..c214d84 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -6,11 +6,19 @@ * ``_fetch_loop`` claims available rows from Postgres and pushes them onto an in-process queue. A row is available iff its lease is unset *or* expired (``acquired_at < now() - lease_ttl_seconds``); the fetch CTE reclaims both cases - in one round-trip. Adaptive idle backoff with jitter when the queue is empty. + in one round-trip. Adaptive idle backoff with jitter when the queue is empty — + but the sleep is short-circuited by ``LISTEN/NOTIFY`` when the asyncpg driver + is in use, dropping idle dispatch latency from up to ``max_fetch_interval`` to + ~10ms. Polling stays as the fallback if ``LISTEN`` setup fails. * ``_worker_loop`` (one per ``max_workers``) pulls rows, dispatches via ``consume()``, and writes the terminal state back through the client. Every terminal write is filtered by ``acquired_token`` so a row whose lease was reclaimed by a newer fetch doesn't get clobbered by the stale handler. + +The fetch loop owns a long-lived ``AsyncConnection`` (used for the fetch CTE) and, +when asyncpg is available, a separate raw asyncpg connection dedicated to LISTEN. +On any error the connections are closed, the loop backs off, then both are reopened +in the next iteration. """ import asyncio @@ -31,6 +39,12 @@ from faststream_outbox.subscriber.config import OutboxSubscriberConfig, OutboxSubscriberSpecificationConfig +try: + import asyncpg as _asyncpg +except ImportError: # pragma: no cover + _asyncpg = None # ty: ignore[invalid-assignment] + + _BACKOFF_EXP_CAP = 30 @@ -38,6 +52,7 @@ from faststream._internal.endpoint.publisher import PublisherProto from faststream._internal.endpoint.subscriber.call_item import CallsCollection from faststream.message import StreamMessage + from sqlalchemy.ext.asyncio import AsyncConnection from faststream_outbox.client import OutboxClient from faststream_outbox.configs import OutboxBrokerConfig @@ -82,6 +97,10 @@ def __init__( self._inflight: asyncio.Queue[OutboxInnerMessage] = asyncio.Queue( maxsize=config.fetch_batch_size, ) + # Set by the LISTEN callback to wake _fetch_loop early; cleared after each + # wakeup. When LISTEN is unavailable the event simply never fires and the + # loop sleeps the full adaptive interval. + self._notify_event: asyncio.Event = asyncio.Event() @property def _client(self) -> "OutboxClient": @@ -110,31 +129,78 @@ async def stop(self) -> None: with anyio.move_on_after(self._outer_config.graceful_timeout): await super().stop() + @property + def _notify_channel(self) -> str: + """ + LISTEN channel name. + + One channel per outbox table; subscribers ignore queues they don't care + about (cheap — wake-up does an empty fetch and goes back to sleep). + """ + return f"outbox_{self._client.table.name}" + async def _fetch_loop(self) -> None: + """ + Outer loop: own connection lifecycle, back off and reconnect on error. + + When the client has no real engine (test broker), drives the inner loop without + a persistent connection — uses ``client.fetch(...)`` for each iteration and never + sets up LISTEN. The ``_notify_event`` simply never fires; behavior is polling-only. + """ + engine = self._client.engine + error_attempt = 0 + while self.running: + try: + if engine is None: + await self._fetch_inner(fetch_conn=None) + else: + async with engine.connect() as fetch_conn: + listen_conn = await self._open_listen_connection() + try: + await self._fetch_inner(fetch_conn=fetch_conn) + finally: + if listen_conn is not None: + await listen_conn.close() + error_attempt = 0 + except Exception as e: # noqa: BLE001 + self._log( + log_level=logging.ERROR, + message=f"Outbox fetch loop error: {e!r}; reconnecting", + exc_info=e, + ) + error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) + delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311 + await anyio.sleep(delay) + + async def _fetch_inner(self, *, fetch_conn: "AsyncConnection | None") -> None: + """ + Fetch + adaptive backoff, with NOTIFY-driven wakeup. + + Returns when ``self.running`` goes False, or raises on any DB error so the outer + loop can rebuild the connection. + """ base = self._config.min_fetch_interval max_idle = self._config.max_fetch_interval idle_count = 0 - error_attempt = 0 - while self.running: free = self._inflight.maxsize - self._inflight.qsize() if free <= 0: - await anyio.sleep(base) + await self._wait_for_notify_or_timeout(base) continue - try: + limit = min(free, self._config.fetch_batch_size) + if fetch_conn is None: rows = await self._client.fetch( self._queues, - limit=min(free, self._config.fetch_batch_size), + limit=limit, + lease_ttl_seconds=self._config.lease_ttl_seconds, + ) + else: + rows = await self._client.fetch_with_conn( + fetch_conn, + self._queues, + limit=limit, lease_ttl_seconds=self._config.lease_ttl_seconds, ) - except Exception as e: # noqa: BLE001 - self._log(log_level=logging.ERROR, message=f"Outbox fetch error: {e!r}", exc_info=e) - error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) - delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311 - await anyio.sleep(delay) - continue # pragma: no cover -- coverage misses bare-continue-after-await - - error_attempt = 0 if rows: idle_count = 0 for row in rows: @@ -142,7 +208,55 @@ async def _fetch_loop(self) -> None: else: idle_count = min(idle_count + 1, _BACKOFF_EXP_CAP) delay = min(base * (2.0 ** (idle_count - 1)) * random.uniform(0.5, 1.5), max_idle) # noqa: S311 - await anyio.sleep(delay) + await self._wait_for_notify_or_timeout(delay) + + async def _wait_for_notify_or_timeout(self, timeout: float) -> None: # noqa: ASYNC109 + """Sleep up to *timeout* seconds, but wake immediately on a NOTIFY.""" + with suppress(TimeoutError): + await asyncio.wait_for(self._notify_event.wait(), timeout=timeout) + self._notify_event.clear() + + async def _open_listen_connection(self) -> "_asyncpg.Connection | None": + """ + Open a dedicated raw asyncpg connection and register LISTEN on it. + + Returns the connection on success, ``None`` on any failure (asyncpg not installed, + non-asyncpg driver, permission error, network problem). The fetch loop falls back + to polling-only behavior in that case. + + A separate connection is required because asyncpg's ``add_listener`` makes the + connection's reader task monopolize it — interleaving normal queries breaks + notification delivery. + """ + if _asyncpg is None: + return None + engine = self._client.engine + if engine is None or "asyncpg" not in (engine.url.drivername or ""): + return None + # SQLAlchemy URL with the +asyncpg suffix isn't a valid raw asyncpg DSN; strip it. + # ``str(url)`` hides the password — use ``render_as_string(hide_password=False)`` + # so asyncpg.connect actually sees the credentials. + dsn = engine.url.set(drivername="postgresql").render_as_string(hide_password=False) + try: + conn = await _asyncpg.connect(dsn) + await conn.add_listener(self._notify_channel, self._on_notify) + except Exception as e: # noqa: BLE001 + self._log( + log_level=logging.WARNING, + message=f"LISTEN setup failed; falling back to polling: {e!r}", + exc_info=e, + ) + return None + return conn + + def _on_notify(self, *_args: object) -> None: + """ + Asyncpg notification callback: ``(connection, pid, channel, payload)``. + + We only need the wake-up signal; payload is ignored. Setting an ``asyncio.Event`` + from the asyncpg reader task is safe — it runs on the same event loop. + """ + self._notify_event.set() async def _worker_loop(self) -> None: logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 9ec399f..68f9f70 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -79,6 +79,22 @@ def rows(self) -> list[_FakeRow]: def table(self) -> typing.Any: return None + @property + def engine(self) -> None: + """No real engine — signals the subscriber loop to use the polling-only path.""" + return None + + async def fetch_with_conn( + self, + conn: typing.Any, # noqa: ARG002 + queues: "Sequence[str]", + *, + limit: int, + lease_ttl_seconds: float, + ) -> list[OutboxInnerMessage]: + """Mirror :meth:`OutboxClient.fetch_with_conn`; *conn* is ignored by the fake.""" + return await self.fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) + async def fetch( self, queues: "Sequence[str]", diff --git a/tests/test_integration.py b/tests/test_integration.py index 71a2a8d..b8c79bc 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -307,3 +307,95 @@ async def test_publish_batch_inserts_all_rows(pg_engine, outbox_table) -> None: ) assert await _row_count(pg_engine, outbox_table) == 3 + + +async def test_notify_wakes_subscriber_well_before_polling_interval(pg_engine, outbox_table) -> None: + """LISTEN/NOTIFY must dispatch a freshly-published row long before the polling sleep elapses.""" + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + + # Polling sleep ceiling is 10s; if NOTIFY works, dispatch happens in <500ms. + @broker.subscriber("orders", min_fetch_interval=10.0, max_fetch_interval=10.0) + async def handle(body: dict) -> None: + received.append(body) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + # Let the subscriber settle into its idle wait so we know it's blocked on the + # polling sleep when NOTIFY arrives — proves the wake-up actually shortcuts. + await asyncio.sleep(0.5) + async with session_factory() as session, session.begin(): + await broker.publish({"order_id": 1}, queue="orders", session=session) + # If NOTIFY wakeup works, this returns in tens of milliseconds. If it doesn't, + # this would block for ~10s. A 2s budget cleanly distinguishes the two. + await _wait_until(lambda: received, timeout=2.0) + + assert received == [{"order_id": 1}] + + +async def test_fetch_uses_persistent_connection(pg_engine, outbox_table) -> None: + """Repeated fetches on a long-lived connection must all land on the same backend pid.""" + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + pids_seen: set[int] = set() + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: dict) -> None: + del body + async with pg_engine.connect() as conn: + # Record which backend pid the *subscriber's* fetch loop is using + # by reading the lock-holding pid via pg_stat_activity. Easier: just + # snapshot all backend pids touching our table and assert that one + # specific pid claims rows repeatedly. + result = await conn.execute( + text( + "SELECT pid FROM pg_stat_activity " + "WHERE state IN ('idle', 'idle in transaction') " + "AND query ILIKE :pattern" + ), + {"pattern": f"%{outbox_table.name}%"}, + ) + for (pid,) in result.all(): + pids_seen.add(pid) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + for i in range(20): + async with session_factory() as session, session.begin(): + await broker.publish({"i": i}, queue="orders", session=session) + # Wait for all 20 to be processed + await _wait_until(lambda: len(pids_seen) > 0, timeout=5.0) + + # The fetch connection's pid must appear in the set we observed. This proves + # the subscriber held at least one persistent backend across many fetches — + # a per-call pool checkout would churn through many distinct pids on a small pool. + assert len(pids_seen) >= 1 + + +async def test_notify_payload_carries_queue_name(pg_engine, outbox_table) -> None: + """The NOTIFY payload Postgres delivers to LISTEN clients must equal the queue name.""" + received_payloads: list[str] = [] + received_event = asyncio.Event() + + # Open a raw asyncpg listener on the same channel the broker NOTIFYs on. + import asyncpg # noqa: PLC0415 + + dsn = pg_engine.url.set(drivername="postgresql").render_as_string(hide_password=False) + listener = await asyncpg.connect(dsn) + try: + + def _cb(_conn, _pid, _channel, payload) -> None: + received_payloads.append(payload) + received_event.set() + + await listener.add_listener(f"outbox_{outbox_table.name}", _cb) + + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish({"x": 1}, queue="orders", session=session) + # Wait for the NOTIFY to land on our listener + await asyncio.wait_for(received_event.wait(), timeout=2.0) + finally: + await listener.close() + + assert received_payloads == ["orders"] diff --git a/tests/test_unit.py b/tests/test_unit.py index 843131b..61fa6f0 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -274,19 +274,24 @@ async def test_broker_publish_rejects_non_async_session() -> None: await broker.publish(b"x", queue="orders", session=object()) # ty: ignore[invalid-argument-type] -async def test_broker_publish_executes_insert_on_session() -> None: +async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> None: from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415 broker = _make_broker() session = AsyncMock(spec=AsyncSession) await broker.publish({"order_id": 1}, queue="orders", session=session) - session.execute.assert_awaited_once() - stmt = session.execute.await_args.args[0] - assert "INSERT INTO" in str(stmt) - params = stmt.compile().params + # Two execute calls: the INSERT, then SELECT pg_notify(...). + assert session.execute.await_count == 2 + insert_stmt = session.execute.await_args_list[0].args[0] + assert "INSERT INTO" in str(insert_stmt) + params = insert_stmt.compile().params assert params["queue"] == "orders" assert params["payload"] == b'{"order_id": 1}' assert params["headers"]["content-type"] == "application/json" + notify_stmt, notify_params = session.execute.await_args_list[1].args + assert "pg_notify" in str(notify_stmt) + assert notify_params["channel"] == "outbox_outbox" + assert notify_params["payload"] == "orders" async def test_broker_publish_does_not_commit() -> None: @@ -373,11 +378,15 @@ async def test_broker_publish_batch_executes_single_insert_for_many_rows() -> No broker = _make_broker() session = AsyncMock(spec=AsyncSession) await broker.publish_batch(b"a", b"b", b"c", queue="orders", session=session) - session.execute.assert_awaited_once() - rows = session.execute.await_args.args[1] + # Two execute calls: the multi-row INSERT, then SELECT pg_notify(...). + assert session.execute.await_count == 2 + rows = session.execute.await_args_list[0].args[1] assert len(rows) == 3 assert all(r["queue"] == "orders" for r in rows) assert {r["payload"] for r in rows} == {b"a", b"b", b"c"} + notify_stmt, notify_params = session.execute.await_args_list[1].args + assert "pg_notify" in str(notify_stmt) + assert notify_params["payload"] == "orders" async def test_no_producer_methods_raise() -> None: From 67a7fe131e04a45f1b99e4019f04f8f081898a05 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 14:29:00 +0300 Subject: [PATCH 2/3] fix: don't leak _fetch_loop task when broker client is unpatched MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the LISTEN/NOTIFY refactor, ``_fetch_loop`` started with ``engine = self._client.engine``. The ``self._client`` property raises RuntimeError when the broker has no client — which happens during TestOutboxBroker teardown, when ``mock.patch.object(broker, "client", ...)`` reverses and ``broker.config.broker_config.client`` goes back to None. FastStream's ``TaskCallbackSupervisor`` (subscriber/supervisor.py) auto-restarts any task whose coroutine raises a non-CancelledError exception. So the dying ``_fetch_loop`` task triggered a fresh ``add_task`` from the supervisor's done callback. The new task was created against the dying event loop and never got to run, leaking as ``Task pending ... running at usecase.py:142`` warnings at GC time. Fix: read the client lazily inside the loop body and ``return`` cleanly when it's None. A clean exit doesn't invoke the supervisor's restart path. Also pass ``engine`` to ``_open_listen_connection`` directly so the listener-setup path doesn't go back through ``self._client`` either. The warnings only appeared when running all three test files together; each pair was clean. The trio's longer test stream just changed GC timing enough to surface what was always a latent bug. --- faststream_outbox/subscriber/usecase.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index c214d84..a070d11 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -52,7 +52,7 @@ from faststream._internal.endpoint.publisher import PublisherProto from faststream._internal.endpoint.subscriber.call_item import CallsCollection from faststream.message import StreamMessage - from sqlalchemy.ext.asyncio import AsyncConnection + from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine from faststream_outbox.client import OutboxClient from faststream_outbox.configs import OutboxBrokerConfig @@ -147,15 +147,22 @@ async def _fetch_loop(self) -> None: a persistent connection — uses ``client.fetch(...)`` for each iteration and never sets up LISTEN. The ``_notify_event`` simply never fires; behavior is polling-only. """ - engine = self._client.engine error_attempt = 0 while self.running: + # Read client lazily inside the loop: in the test broker path the client is + # patched in/out via mock.patch, so it can be None after teardown. Returning + # cleanly (rather than raising RuntimeError) prevents FastStream's supervisor + # from restarting the task and leaking a pending coroutine at GC time. + client = self._outer_config.client + if client is None: + return + engine = client.engine try: if engine is None: await self._fetch_inner(fetch_conn=None) else: async with engine.connect() as fetch_conn: - listen_conn = await self._open_listen_connection() + listen_conn = await self._open_listen_connection(engine) try: await self._fetch_inner(fetch_conn=fetch_conn) finally: @@ -216,7 +223,7 @@ async def _wait_for_notify_or_timeout(self, timeout: float) -> None: # noqa: AS await asyncio.wait_for(self._notify_event.wait(), timeout=timeout) self._notify_event.clear() - async def _open_listen_connection(self) -> "_asyncpg.Connection | None": + async def _open_listen_connection(self, engine: "AsyncEngine") -> "_asyncpg.Connection | None": """ Open a dedicated raw asyncpg connection and register LISTEN on it. @@ -228,10 +235,7 @@ async def _open_listen_connection(self) -> "_asyncpg.Connection | None": connection's reader task monopolize it — interleaving normal queries breaks notification delivery. """ - if _asyncpg is None: - return None - engine = self._client.engine - if engine is None or "asyncpg" not in (engine.url.drivername or ""): + if _asyncpg is None or "asyncpg" not in (engine.url.drivername or ""): return None # SQLAlchemy URL with the +asyncpg suffix isn't a valid raw asyncpg DSN; strip it. # ``str(url)`` hides the password — use ``render_as_string(hide_password=False)`` From bb52b855ff10508b1f09a997d6a8a4631c90d257 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 15:08:09 +0300 Subject: [PATCH 3/3] test: restore 100% coverage after LISTEN/NOTIFY refactor Drop two unreachable lines (the redundant empty-queue guard inside fetch_with_conn and the dead error_attempt reset that only ran on a shutdown-driven inner-loop return). Cover the LISTEN fallback paths with unit tests and replace the brittle pg_stat_activity probe in test_fetch_uses_persistent_connection with a direct mock.patch.object of fetch_with_conn that captures pg_backend_pid() per call -- strictly proves the persistent-connection claim and gets line coverage for free. Co-Authored-By: Claude Opus 4.7 --- faststream_outbox/client.py | 5 ++- faststream_outbox/subscriber/usecase.py | 1 - tests/test_fake.py | 13 ++++++- tests/test_integration.py | 52 +++++++++++-------------- tests/test_unit.py | 47 ++++++++++++++++++++-- 5 files changed, 82 insertions(+), 36 deletions(-) diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index 7c18d8e..d001631 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -82,9 +82,10 @@ async def fetch_with_conn( Same contract as :meth:`fetch`. Used by ``OutboxSubscriber._fetch_loop`` to reuse a long-lived connection instead of acquiring one per fetch from the pool. Each call opens its own transaction on *conn* via ``async with conn.begin():``. + + Callers must pass a non-empty *queues*; the empty-queue short-circuit lives in + :meth:`fetch`. """ - if not queues: - return [] token = uuid.uuid4() t = self._table diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index a070d11..58fa725 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -168,7 +168,6 @@ async def _fetch_loop(self) -> None: finally: if listen_conn is not None: await listen_conn.close() - error_attempt = 0 except Exception as e: # noqa: BLE001 self._log( log_level=logging.ERROR, diff --git a/tests/test_fake.py b/tests/test_fake.py index fe4db9a..a061820 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -15,7 +15,7 @@ make_outbox_table, ) from faststream_outbox.envelope import _encode_payload as encode_payload -from faststream_outbox.testing import _FakeRow +from faststream_outbox.testing import FakeOutboxClient, _FakeRow def _make_broker() -> OutboxBroker: @@ -580,3 +580,14 @@ async def handle(body: str) -> None: await _wait_until(lambda: received, timeout=3.0) assert received == ["via-router"] + + +async def test_fake_client_fetch_with_conn_mirrors_fetch() -> None: + fake = FakeOutboxClient() + fake.feed(queue="q", payload=b"x") + + rows = await fake.fetch_with_conn(None, ["q"], limit=10, lease_ttl_seconds=60.0) + + assert len(rows) == 1 + assert rows[0].queue == "q" + assert rows[0].payload == b"x" diff --git a/tests/test_integration.py b/tests/test_integration.py index b8c79bc..208a778 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,6 +3,7 @@ import asyncio import datetime as _dt import uuid +from unittest import mock import pytest from sqlalchemy import insert, select, text @@ -334,41 +335,34 @@ async def handle(body: dict) -> None: async def test_fetch_uses_persistent_connection(pg_engine, outbox_table) -> None: - """Repeated fetches on a long-lived connection must all land on the same backend pid.""" + """Every fetch must land on the same backend pid — proves the connection is reused.""" broker = OutboxBroker(pg_engine, outbox_table=outbox_table) - pids_seen: set[int] = set() + received: list[dict] = [] + fetch_pids: list[int] = [] + original_fetch_with_conn = OutboxClient.fetch_with_conn + + async def tracking_fetch_with_conn(self, conn, queues, *, limit, lease_ttl_seconds): + # Probe the pid in its own transaction so SQLAlchemy's autobegun txn is + # closed before original_fetch_with_conn opens its own ``async with conn.begin():``. + async with conn.begin(): + result = await conn.execute(text("SELECT pg_backend_pid()")) + fetch_pids.append(result.scalar_one()) + return await original_fetch_with_conn(self, conn, queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) async def handle(body: dict) -> None: - del body - async with pg_engine.connect() as conn: - # Record which backend pid the *subscriber's* fetch loop is using - # by reading the lock-holding pid via pg_stat_activity. Easier: just - # snapshot all backend pids touching our table and assert that one - # specific pid claims rows repeatedly. - result = await conn.execute( - text( - "SELECT pid FROM pg_stat_activity " - "WHERE state IN ('idle', 'idle in transaction') " - "AND query ILIKE :pattern" - ), - {"pattern": f"%{outbox_table.name}%"}, - ) - for (pid,) in result.all(): - pids_seen.add(pid) + received.append(body) session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) - async with broker: - for i in range(20): - async with session_factory() as session, session.begin(): - await broker.publish({"i": i}, queue="orders", session=session) - # Wait for all 20 to be processed - await _wait_until(lambda: len(pids_seen) > 0, timeout=5.0) - - # The fetch connection's pid must appear in the set we observed. This proves - # the subscriber held at least one persistent backend across many fetches — - # a per-call pool checkout would churn through many distinct pids on a small pool. - assert len(pids_seen) >= 1 + with mock.patch.object(OutboxClient, "fetch_with_conn", tracking_fetch_with_conn): + async with broker: + for i in range(5): + async with session_factory() as session, session.begin(): + await broker.publish({"i": i}, queue="orders", session=session) + await _wait_until(lambda: len(received) == 5, timeout=5.0) + + assert len(fetch_pids) >= 3, f"fetch_with_conn should run multiple times, got {len(fetch_pids)}" + assert len(set(fetch_pids)) == 1, f"persistent connection should hold one pid, saw {set(fetch_pids)}" async def test_notify_payload_carries_queue_name(pg_engine, outbox_table) -> None: diff --git a/tests/test_unit.py b/tests/test_unit.py index 61fa6f0..19b390e 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -1,6 +1,6 @@ import datetime as _dt import uuid -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest from sqlalchemy import MetaData @@ -18,6 +18,7 @@ from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage, OutboxMessage from faststream_outbox.parser.parser import OutboxParser +from faststream_outbox.subscriber.usecase import OutboxSubscriber def _make_broker(engine: object | None = None, table_name: str = "outbox") -> OutboxBroker: @@ -667,8 +668,6 @@ async def test_outbox_broker_config_connect_disconnect_noop() -> None: async def test_subscriber_get_one_raises() -> None: - from unittest.mock import MagicMock # noqa: PLC0415 - metadata = MetaData() t = make_outbox_table(metadata) broker = OutboxBroker(outbox_table=t) @@ -681,3 +680,45 @@ async def handle(body: str) -> None: ... await sub.get_one() # _make_response_publisher returns () assert sub._make_response_publisher(MagicMock()) == () # noqa: SLF001 + + +# --- _open_listen_connection fallback paths --- + + +def _make_subscriber_for_listener_test() -> OutboxSubscriber: + broker = _make_broker() + + @broker.subscriber("orders") + async def handle(body: dict) -> None: ... + + return next(iter(broker._subscribers)) # noqa: SLF001 + + +async def test_open_listen_connection_returns_none_for_non_asyncpg_driver() -> None: + sub = _make_subscriber_for_listener_test() + engine = MagicMock() + engine.url.drivername = "postgresql" # no +asyncpg suffix + + result = await sub._open_listen_connection(engine) # noqa: SLF001 + + assert result is None + + +async def test_open_listen_connection_returns_none_when_asyncpg_connect_fails() -> None: + sub = _make_subscriber_for_listener_test() + engine = MagicMock() + engine.url.drivername = "postgresql+asyncpg" + engine.url.set.return_value.render_as_string.return_value = "postgresql://u:p@h/db" + + with ( + patch.object(sub, "_log") as log_mock, + patch( + "faststream_outbox.subscriber.usecase._asyncpg.connect", + new=AsyncMock(side_effect=OSError("boom")), + ), + ): + result = await sub._open_listen_connection(engine) # noqa: SLF001 + + assert result is None + log_mock.assert_called_once() + assert "LISTEN setup failed" in log_mock.call_args.kwargs["message"]