diff --git a/CLAUDE.md b/CLAUDE.md index ae992a3..c5ec16c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -35,9 +35,13 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr ### Two-loop subscriber (`subscriber/usecase.py`) Per subscriber: -1. **`_fetch_loop`** — single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. Adaptive idle backoff with jitter (capped by `_BACKOFF_EXP_CAP=30`); separate exponential backoff on fetch errors. +1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration. 2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`. +Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_
', queue)` on the caller's session right after the INSERT. NOTIFY is transactional — listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY. + +Channel naming convention: `outbox_`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling. + ### Lease-token invariant — load-bearing Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `acquired_token`. If a slow handler's lease expired and a newer fetch reclaimed the row with a fresh token, the slow handler's `DELETE`/`UPDATE` finds `rowcount == 0` and is silently dropped — preventing it from clobbering the new lease holder. Any new fetch/terminal path must preserve this. diff --git a/README.md b/README.md index 85e0be6..12e2292 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ async with session_factory() as session, session.begin(): A subscriber owns two async loops: -1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. +1. **fetch** — claims available rows via `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *` in a single CTE. A row is "available" iff its lease is unset *or* expired (`acquired_at < now() - lease_ttl_seconds`), so the fetch query reclaims stuck rows inline — no separate reaper is needed. With the asyncpg driver, the loop also `LISTEN`s on `outbox_
` and `publish` emits `pg_notify(...)`, so idle dispatch latency is sub-100ms instead of up to `max_fetch_interval`. Polling stays as the fallback. 2. **workers** (× `max_workers`) — dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`. On failure, the retry strategy decides: schedule another attempt, or terminal `DELETE`. The `acquired_token` is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal `DELETE`/`UPDATE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row. diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index df6700b..5c176e1 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -22,7 +22,7 @@ from faststream._internal.types import BrokerMiddleware, CustomCallable from faststream.specification.schema import BrokerSpec from faststream.specification.schema.extra import Tag, TagDict -from sqlalchemy import insert +from sqlalchemy import insert, text from sqlalchemy.ext.asyncio import AsyncSession from faststream_outbox.client import OutboxClient @@ -208,6 +208,7 @@ async def publish( # ty: ignore[invalid-method-override] raise TypeError(msg) payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id) await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs)) + await self._notify(session, queue) async def publish_batch( # ty: ignore[invalid-method-override] self, @@ -232,6 +233,23 @@ async def publish_batch( # ty: ignore[invalid-method-override] payload, hdrs = _encode_payload(body, headers=headers) rows.append({"queue": queue, "payload": payload, "headers": hdrs}) await session.execute(insert(self._outbox_table), rows) + await self._notify(session, queue) + + async def _notify(self, session: AsyncSession, queue: str) -> None: + """ + Emit ``pg_notify('outbox_
', queue)`` so listening subscribers wake immediately. + + Uses ``pg_notify(...)`` rather than raw ``NOTIFY`` so the channel and payload + bind cleanly as parameters (raw NOTIFY accepts only literals — injection-prone). + Runs on the caller's session so the NOTIFY commits with the row insert; if the + caller's transaction rolls back, the NOTIFY is silently discarded by Postgres. + Safe no-op for non-Postgres dialects: subscribers without a matching LISTEN just + ignore it. + """ + await session.execute( + text("SELECT pg_notify(:channel, :payload)"), + {"channel": f"outbox_{self._outbox_table.name}", "payload": queue}, + ) async def request(self, *args: typing.Any, **kwargs: typing.Any) -> typing.NoReturn: msg = "OutboxBroker does not support request-reply" diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index b6b1f31..d001631 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -39,7 +39,7 @@ from collections.abc import Sequence from sqlalchemy import Connection, Table - from sqlalchemy.ext.asyncio import AsyncEngine + from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine @dataclass(frozen=True) @@ -58,23 +58,34 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None: def table(self) -> "Table": return self._table - async def fetch( + @property + def engine(self) -> "AsyncEngine": + """ + The underlying ``AsyncEngine``. + + Used by the subscriber loop to open its own long-lived fetch connection and to + drive ``LISTEN/NOTIFY``. + """ + return self._engine + + async def fetch_with_conn( self, + conn: "AsyncConnection", queues: "Sequence[str]", *, limit: int, lease_ttl_seconds: float, ) -> list[OutboxInnerMessage]: """ - Atomically claim up to *limit* available rows for the given queue names. + Run the fetch CTE on a caller-supplied connection. - A row is available iff its lease is unset (``acquired_token IS NULL``) or its - lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each - carries ``acquired_token`` which the worker loop must echo back on the terminal - ``DELETE``/``UPDATE``. + Same contract as :meth:`fetch`. Used by ``OutboxSubscriber._fetch_loop`` to reuse + a long-lived connection instead of acquiring one per fetch from the pool. Each + call opens its own transaction on *conn* via ``async with conn.begin():``. + + Callers must pass a non-empty *queues*; the empty-queue short-circuit lives in + :meth:`fetch`. """ - if not queues: - return [] token = uuid.uuid4() t = self._table @@ -103,11 +114,40 @@ async def fetch( ) .returning(*t.c) ) - async with self._engine.begin() as conn: + async with conn.begin(): result = await conn.execute(stmt, {"lease_ttl": max(0.0, lease_ttl_seconds)}) rows = result.mappings().all() return [_row_to_message(dict(row)) for row in rows] + async def fetch( + self, + queues: "Sequence[str]", + *, + limit: int, + lease_ttl_seconds: float, + ) -> list[OutboxInnerMessage]: + """ + Atomically claim up to *limit* available rows for the given queue names. + + A row is available iff its lease is unset (``acquired_token IS NULL``) or its + lease is older than *lease_ttl_seconds*. Returns the freshly-leased rows; each + carries ``acquired_token`` which the worker loop must echo back on the terminal + ``DELETE``/``UPDATE``. + + Convenience wrapper that opens a one-shot connection and delegates to + :meth:`fetch_with_conn`. The subscriber loop uses ``fetch_with_conn`` directly + with a long-lived connection. + """ + if not queues: + return [] + async with self._engine.connect() as conn: + return await self.fetch_with_conn( + conn, + queues, + limit=limit, + lease_ttl_seconds=lease_ttl_seconds, + ) + async def delete_with_lease(self, message_id: int, acquired_token: uuid.UUID) -> bool: """Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted.""" t = self._table diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 6fe8833..58fa725 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -6,11 +6,19 @@ * ``_fetch_loop`` claims available rows from Postgres and pushes them onto an in-process queue. A row is available iff its lease is unset *or* expired (``acquired_at < now() - lease_ttl_seconds``); the fetch CTE reclaims both cases - in one round-trip. Adaptive idle backoff with jitter when the queue is empty. + in one round-trip. Adaptive idle backoff with jitter when the queue is empty — + but the sleep is short-circuited by ``LISTEN/NOTIFY`` when the asyncpg driver + is in use, dropping idle dispatch latency from up to ``max_fetch_interval`` to + ~10ms. Polling stays as the fallback if ``LISTEN`` setup fails. * ``_worker_loop`` (one per ``max_workers``) pulls rows, dispatches via ``consume()``, and writes the terminal state back through the client. Every terminal write is filtered by ``acquired_token`` so a row whose lease was reclaimed by a newer fetch doesn't get clobbered by the stale handler. + +The fetch loop owns a long-lived ``AsyncConnection`` (used for the fetch CTE) and, +when asyncpg is available, a separate raw asyncpg connection dedicated to LISTEN. +On any error the connections are closed, the loop backs off, then both are reopened +in the next iteration. """ import asyncio @@ -31,6 +39,12 @@ from faststream_outbox.subscriber.config import OutboxSubscriberConfig, OutboxSubscriberSpecificationConfig +try: + import asyncpg as _asyncpg +except ImportError: # pragma: no cover + _asyncpg = None # ty: ignore[invalid-assignment] + + _BACKOFF_EXP_CAP = 30 @@ -38,6 +52,7 @@ from faststream._internal.endpoint.publisher import PublisherProto from faststream._internal.endpoint.subscriber.call_item import CallsCollection from faststream.message import StreamMessage + from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine from faststream_outbox.client import OutboxClient from faststream_outbox.configs import OutboxBrokerConfig @@ -82,6 +97,10 @@ def __init__( self._inflight: asyncio.Queue[OutboxInnerMessage] = asyncio.Queue( maxsize=config.fetch_batch_size, ) + # Set by the LISTEN callback to wake _fetch_loop early; cleared after each + # wakeup. When LISTEN is unavailable the event simply never fires and the + # loop sleeps the full adaptive interval. + self._notify_event: asyncio.Event = asyncio.Event() @property def _client(self) -> "OutboxClient": @@ -110,31 +129,84 @@ async def stop(self) -> None: with anyio.move_on_after(self._outer_config.graceful_timeout): await super().stop() + @property + def _notify_channel(self) -> str: + """ + LISTEN channel name. + + One channel per outbox table; subscribers ignore queues they don't care + about (cheap — wake-up does an empty fetch and goes back to sleep). + """ + return f"outbox_{self._client.table.name}" + async def _fetch_loop(self) -> None: + """ + Outer loop: own connection lifecycle, back off and reconnect on error. + + When the client has no real engine (test broker), drives the inner loop without + a persistent connection — uses ``client.fetch(...)`` for each iteration and never + sets up LISTEN. The ``_notify_event`` simply never fires; behavior is polling-only. + """ + error_attempt = 0 + while self.running: + # Read client lazily inside the loop: in the test broker path the client is + # patched in/out via mock.patch, so it can be None after teardown. Returning + # cleanly (rather than raising RuntimeError) prevents FastStream's supervisor + # from restarting the task and leaking a pending coroutine at GC time. + client = self._outer_config.client + if client is None: + return + engine = client.engine + try: + if engine is None: + await self._fetch_inner(fetch_conn=None) + else: + async with engine.connect() as fetch_conn: + listen_conn = await self._open_listen_connection(engine) + try: + await self._fetch_inner(fetch_conn=fetch_conn) + finally: + if listen_conn is not None: + await listen_conn.close() + except Exception as e: # noqa: BLE001 + self._log( + log_level=logging.ERROR, + message=f"Outbox fetch loop error: {e!r}; reconnecting", + exc_info=e, + ) + error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) + delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311 + await anyio.sleep(delay) + + async def _fetch_inner(self, *, fetch_conn: "AsyncConnection | None") -> None: + """ + Fetch + adaptive backoff, with NOTIFY-driven wakeup. + + Returns when ``self.running`` goes False, or raises on any DB error so the outer + loop can rebuild the connection. + """ base = self._config.min_fetch_interval max_idle = self._config.max_fetch_interval idle_count = 0 - error_attempt = 0 - while self.running: free = self._inflight.maxsize - self._inflight.qsize() if free <= 0: - await anyio.sleep(base) + await self._wait_for_notify_or_timeout(base) continue - try: + limit = min(free, self._config.fetch_batch_size) + if fetch_conn is None: rows = await self._client.fetch( self._queues, - limit=min(free, self._config.fetch_batch_size), + limit=limit, + lease_ttl_seconds=self._config.lease_ttl_seconds, + ) + else: + rows = await self._client.fetch_with_conn( + fetch_conn, + self._queues, + limit=limit, lease_ttl_seconds=self._config.lease_ttl_seconds, ) - except Exception as e: # noqa: BLE001 - self._log(log_level=logging.ERROR, message=f"Outbox fetch error: {e!r}", exc_info=e) - error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) - delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311 - await anyio.sleep(delay) - continue # pragma: no cover -- coverage misses bare-continue-after-await - - error_attempt = 0 if rows: idle_count = 0 for row in rows: @@ -142,7 +214,52 @@ async def _fetch_loop(self) -> None: else: idle_count = min(idle_count + 1, _BACKOFF_EXP_CAP) delay = min(base * (2.0 ** (idle_count - 1)) * random.uniform(0.5, 1.5), max_idle) # noqa: S311 - await anyio.sleep(delay) + await self._wait_for_notify_or_timeout(delay) + + async def _wait_for_notify_or_timeout(self, timeout: float) -> None: # noqa: ASYNC109 + """Sleep up to *timeout* seconds, but wake immediately on a NOTIFY.""" + with suppress(TimeoutError): + await asyncio.wait_for(self._notify_event.wait(), timeout=timeout) + self._notify_event.clear() + + async def _open_listen_connection(self, engine: "AsyncEngine") -> "_asyncpg.Connection | None": + """ + Open a dedicated raw asyncpg connection and register LISTEN on it. + + Returns the connection on success, ``None`` on any failure (asyncpg not installed, + non-asyncpg driver, permission error, network problem). The fetch loop falls back + to polling-only behavior in that case. + + A separate connection is required because asyncpg's ``add_listener`` makes the + connection's reader task monopolize it — interleaving normal queries breaks + notification delivery. + """ + if _asyncpg is None or "asyncpg" not in (engine.url.drivername or ""): + return None + # SQLAlchemy URL with the +asyncpg suffix isn't a valid raw asyncpg DSN; strip it. + # ``str(url)`` hides the password — use ``render_as_string(hide_password=False)`` + # so asyncpg.connect actually sees the credentials. + dsn = engine.url.set(drivername="postgresql").render_as_string(hide_password=False) + try: + conn = await _asyncpg.connect(dsn) + await conn.add_listener(self._notify_channel, self._on_notify) + except Exception as e: # noqa: BLE001 + self._log( + log_level=logging.WARNING, + message=f"LISTEN setup failed; falling back to polling: {e!r}", + exc_info=e, + ) + return None + return conn + + def _on_notify(self, *_args: object) -> None: + """ + Asyncpg notification callback: ``(connection, pid, channel, payload)``. + + We only need the wake-up signal; payload is ignored. Setting an ``asyncio.Event`` + from the asyncpg reader task is safe — it runs on the same event loop. + """ + self._notify_event.set() async def _worker_loop(self) -> None: logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 9ec399f..68f9f70 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -79,6 +79,22 @@ def rows(self) -> list[_FakeRow]: def table(self) -> typing.Any: return None + @property + def engine(self) -> None: + """No real engine — signals the subscriber loop to use the polling-only path.""" + return None + + async def fetch_with_conn( + self, + conn: typing.Any, # noqa: ARG002 + queues: "Sequence[str]", + *, + limit: int, + lease_ttl_seconds: float, + ) -> list[OutboxInnerMessage]: + """Mirror :meth:`OutboxClient.fetch_with_conn`; *conn* is ignored by the fake.""" + return await self.fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) + async def fetch( self, queues: "Sequence[str]", diff --git a/tests/test_fake.py b/tests/test_fake.py index fe4db9a..a061820 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -15,7 +15,7 @@ make_outbox_table, ) from faststream_outbox.envelope import _encode_payload as encode_payload -from faststream_outbox.testing import _FakeRow +from faststream_outbox.testing import FakeOutboxClient, _FakeRow def _make_broker() -> OutboxBroker: @@ -580,3 +580,14 @@ async def handle(body: str) -> None: await _wait_until(lambda: received, timeout=3.0) assert received == ["via-router"] + + +async def test_fake_client_fetch_with_conn_mirrors_fetch() -> None: + fake = FakeOutboxClient() + fake.feed(queue="q", payload=b"x") + + rows = await fake.fetch_with_conn(None, ["q"], limit=10, lease_ttl_seconds=60.0) + + assert len(rows) == 1 + assert rows[0].queue == "q" + assert rows[0].payload == b"x" diff --git a/tests/test_integration.py b/tests/test_integration.py index 71a2a8d..208a778 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,6 +3,7 @@ import asyncio import datetime as _dt import uuid +from unittest import mock import pytest from sqlalchemy import insert, select, text @@ -307,3 +308,88 @@ async def test_publish_batch_inserts_all_rows(pg_engine, outbox_table) -> None: ) assert await _row_count(pg_engine, outbox_table) == 3 + + +async def test_notify_wakes_subscriber_well_before_polling_interval(pg_engine, outbox_table) -> None: + """LISTEN/NOTIFY must dispatch a freshly-published row long before the polling sleep elapses.""" + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + + # Polling sleep ceiling is 10s; if NOTIFY works, dispatch happens in <500ms. + @broker.subscriber("orders", min_fetch_interval=10.0, max_fetch_interval=10.0) + async def handle(body: dict) -> None: + received.append(body) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + # Let the subscriber settle into its idle wait so we know it's blocked on the + # polling sleep when NOTIFY arrives — proves the wake-up actually shortcuts. + await asyncio.sleep(0.5) + async with session_factory() as session, session.begin(): + await broker.publish({"order_id": 1}, queue="orders", session=session) + # If NOTIFY wakeup works, this returns in tens of milliseconds. If it doesn't, + # this would block for ~10s. A 2s budget cleanly distinguishes the two. + await _wait_until(lambda: received, timeout=2.0) + + assert received == [{"order_id": 1}] + + +async def test_fetch_uses_persistent_connection(pg_engine, outbox_table) -> None: + """Every fetch must land on the same backend pid — proves the connection is reused.""" + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + received: list[dict] = [] + fetch_pids: list[int] = [] + original_fetch_with_conn = OutboxClient.fetch_with_conn + + async def tracking_fetch_with_conn(self, conn, queues, *, limit, lease_ttl_seconds): + # Probe the pid in its own transaction so SQLAlchemy's autobegun txn is + # closed before original_fetch_with_conn opens its own ``async with conn.begin():``. + async with conn.begin(): + result = await conn.execute(text("SELECT pg_backend_pid()")) + fetch_pids.append(result.scalar_one()) + return await original_fetch_with_conn(self, conn, queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: dict) -> None: + received.append(body) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + with mock.patch.object(OutboxClient, "fetch_with_conn", tracking_fetch_with_conn): + async with broker: + for i in range(5): + async with session_factory() as session, session.begin(): + await broker.publish({"i": i}, queue="orders", session=session) + await _wait_until(lambda: len(received) == 5, timeout=5.0) + + assert len(fetch_pids) >= 3, f"fetch_with_conn should run multiple times, got {len(fetch_pids)}" + assert len(set(fetch_pids)) == 1, f"persistent connection should hold one pid, saw {set(fetch_pids)}" + + +async def test_notify_payload_carries_queue_name(pg_engine, outbox_table) -> None: + """The NOTIFY payload Postgres delivers to LISTEN clients must equal the queue name.""" + received_payloads: list[str] = [] + received_event = asyncio.Event() + + # Open a raw asyncpg listener on the same channel the broker NOTIFYs on. + import asyncpg # noqa: PLC0415 + + dsn = pg_engine.url.set(drivername="postgresql").render_as_string(hide_password=False) + listener = await asyncpg.connect(dsn) + try: + + def _cb(_conn, _pid, _channel, payload) -> None: + received_payloads.append(payload) + received_event.set() + + await listener.add_listener(f"outbox_{outbox_table.name}", _cb) + + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish({"x": 1}, queue="orders", session=session) + # Wait for the NOTIFY to land on our listener + await asyncio.wait_for(received_event.wait(), timeout=2.0) + finally: + await listener.close() + + assert received_payloads == ["orders"] diff --git a/tests/test_unit.py b/tests/test_unit.py index 843131b..19b390e 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -1,6 +1,6 @@ import datetime as _dt import uuid -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest from sqlalchemy import MetaData @@ -18,6 +18,7 @@ from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage, OutboxMessage from faststream_outbox.parser.parser import OutboxParser +from faststream_outbox.subscriber.usecase import OutboxSubscriber def _make_broker(engine: object | None = None, table_name: str = "outbox") -> OutboxBroker: @@ -274,19 +275,24 @@ async def test_broker_publish_rejects_non_async_session() -> None: await broker.publish(b"x", queue="orders", session=object()) # ty: ignore[invalid-argument-type] -async def test_broker_publish_executes_insert_on_session() -> None: +async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> None: from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415 broker = _make_broker() session = AsyncMock(spec=AsyncSession) await broker.publish({"order_id": 1}, queue="orders", session=session) - session.execute.assert_awaited_once() - stmt = session.execute.await_args.args[0] - assert "INSERT INTO" in str(stmt) - params = stmt.compile().params + # Two execute calls: the INSERT, then SELECT pg_notify(...). + assert session.execute.await_count == 2 + insert_stmt = session.execute.await_args_list[0].args[0] + assert "INSERT INTO" in str(insert_stmt) + params = insert_stmt.compile().params assert params["queue"] == "orders" assert params["payload"] == b'{"order_id": 1}' assert params["headers"]["content-type"] == "application/json" + notify_stmt, notify_params = session.execute.await_args_list[1].args + assert "pg_notify" in str(notify_stmt) + assert notify_params["channel"] == "outbox_outbox" + assert notify_params["payload"] == "orders" async def test_broker_publish_does_not_commit() -> None: @@ -373,11 +379,15 @@ async def test_broker_publish_batch_executes_single_insert_for_many_rows() -> No broker = _make_broker() session = AsyncMock(spec=AsyncSession) await broker.publish_batch(b"a", b"b", b"c", queue="orders", session=session) - session.execute.assert_awaited_once() - rows = session.execute.await_args.args[1] + # Two execute calls: the multi-row INSERT, then SELECT pg_notify(...). + assert session.execute.await_count == 2 + rows = session.execute.await_args_list[0].args[1] assert len(rows) == 3 assert all(r["queue"] == "orders" for r in rows) assert {r["payload"] for r in rows} == {b"a", b"b", b"c"} + notify_stmt, notify_params = session.execute.await_args_list[1].args + assert "pg_notify" in str(notify_stmt) + assert notify_params["payload"] == "orders" async def test_no_producer_methods_raise() -> None: @@ -658,8 +668,6 @@ async def test_outbox_broker_config_connect_disconnect_noop() -> None: async def test_subscriber_get_one_raises() -> None: - from unittest.mock import MagicMock # noqa: PLC0415 - metadata = MetaData() t = make_outbox_table(metadata) broker = OutboxBroker(outbox_table=t) @@ -672,3 +680,45 @@ async def handle(body: str) -> None: ... await sub.get_one() # _make_response_publisher returns () assert sub._make_response_publisher(MagicMock()) == () # noqa: SLF001 + + +# --- _open_listen_connection fallback paths --- + + +def _make_subscriber_for_listener_test() -> OutboxSubscriber: + broker = _make_broker() + + @broker.subscriber("orders") + async def handle(body: dict) -> None: ... + + return next(iter(broker._subscribers)) # noqa: SLF001 + + +async def test_open_listen_connection_returns_none_for_non_asyncpg_driver() -> None: + sub = _make_subscriber_for_listener_test() + engine = MagicMock() + engine.url.drivername = "postgresql" # no +asyncpg suffix + + result = await sub._open_listen_connection(engine) # noqa: SLF001 + + assert result is None + + +async def test_open_listen_connection_returns_none_when_asyncpg_connect_fails() -> None: + sub = _make_subscriber_for_listener_test() + engine = MagicMock() + engine.url.drivername = "postgresql+asyncpg" + engine.url.set.return_value.render_as_string.return_value = "postgresql://u:p@h/db" + + with ( + patch.object(sub, "_log") as log_mock, + patch( + "faststream_outbox.subscriber.usecase._asyncpg.connect", + new=AsyncMock(side_effect=OSError("boom")), + ), + ): + result = await sub._open_listen_connection(engine) # noqa: SLF001 + + assert result is None + log_mock.assert_called_once() + assert "LISTEN setup failed" in log_mock.call_args.kwargs["message"]