Skip to content

feat: persistent fetch connection + LISTEN/NOTIFY wakeups#4

Merged
lesnik512 merged 3 commits intomainfrom
feat/listen-notify-persistent-conn
May 7, 2026
Merged

feat: persistent fetch connection + LISTEN/NOTIFY wakeups#4
lesnik512 merged 3 commits intomainfrom
feat/listen-notify-persistent-conn

Conversation

@lesnik512
Copy link
Copy Markdown
Member

Two perf items collapse into one architectural change. The subscriber's fetch loop now owns:

  1. A long-lived AsyncConnection used for the fetch CTE — replaces the per-call async with engine.begin() pool checkout.
  2. A separate raw asyncpg connection running LISTEN outbox_<table>, driven via asyncpg's native add_listener. A dedicated connection is required because asyncpg's listener task monopolizes its socket; mixing transactional queries on the same connection would break notification delivery.

broker.publish and publish_batch now emit SELECT pg_notify( 'outbox_<table>', queue) after the INSERT, on the caller's session. NOTIFY is transactional in Postgres, so the listener only sees it after the user's transaction commits — atomicity with the row insert is automatic, and rollbacks silently drop the NOTIFY.

The fetch loop's idle-sleep is short-circuited by an asyncio.Event the LISTEN callback sets. Idle dispatch latency drops from up to max_fetch_interval (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error, network blip), the subscriber logs once and degrades gracefully to today's polling behavior. On any DB error inside the fetch loop, both connections are closed, the loop backs off exponentially, then both reopen.

Test broker (FakeOutboxClient) signals "no real engine" by returning None from client.engine; the subscriber takes a polling-only path and never opens persistent connections or LISTENs. Existing fake tests are untouched. New integration tests cover NOTIFY-driven wakeup, NOTIFY payload contents, and persistent-connection reuse.

Pool sizing implication for users: each active subscriber holds 2 connections (1 from the SQLAlchemy pool, 1 raw asyncpg), so size pool_size >= num_subscribers + max_workers (raw asyncpg conn is outside the pool).

Two perf items collapse into one architectural change. The subscriber's
fetch loop now owns:

1. A long-lived AsyncConnection used for the fetch CTE — replaces the
   per-call ``async with engine.begin()`` pool checkout.
2. A separate raw asyncpg connection running ``LISTEN outbox_<table>``,
   driven via asyncpg's native ``add_listener``. A dedicated connection
   is required because asyncpg's listener task monopolizes its socket;
   mixing transactional queries on the same connection would break
   notification delivery.

``broker.publish`` and ``publish_batch`` now emit ``SELECT pg_notify(
'outbox_<table>', queue)`` after the INSERT, on the caller's session.
NOTIFY is transactional in Postgres, so the listener only sees it after
the user's transaction commits — atomicity with the row insert is
automatic, and rollbacks silently drop the NOTIFY.

The fetch loop's idle-sleep is short-circuited by an asyncio.Event the
LISTEN callback sets. Idle dispatch latency drops from up to
``max_fetch_interval`` (default 10s) to ~10ms. If LISTEN setup fails
(asyncpg missing, non-asyncpg driver, permission error, network blip),
the subscriber logs once and degrades gracefully to today's polling
behavior. On any DB error inside the fetch loop, both connections are
closed, the loop backs off exponentially, then both reopen.

Test broker (FakeOutboxClient) signals "no real engine" by returning
None from ``client.engine``; the subscriber takes a polling-only path
and never opens persistent connections or LISTENs. Existing fake tests
are untouched. New integration tests cover NOTIFY-driven wakeup, NOTIFY
payload contents, and persistent-connection reuse.

Pool sizing implication for users: each active subscriber holds 2
connections (1 from the SQLAlchemy pool, 1 raw asyncpg), so size
``pool_size >= num_subscribers + max_workers`` (raw asyncpg conn is
outside the pool).
@lesnik512 lesnik512 self-assigned this May 7, 2026
lesnik512 and others added 2 commits May 7, 2026 14:29
After the LISTEN/NOTIFY refactor, ``_fetch_loop`` started with
``engine = self._client.engine``. The ``self._client`` property
raises RuntimeError when the broker has no client — which happens
during TestOutboxBroker teardown, when ``mock.patch.object(broker,
"client", ...)`` reverses and ``broker.config.broker_config.client``
goes back to None.

FastStream's ``TaskCallbackSupervisor`` (subscriber/supervisor.py)
auto-restarts any task whose coroutine raises a non-CancelledError
exception. So the dying ``_fetch_loop`` task triggered a fresh
``add_task`` from the supervisor's done callback. The new task was
created against the dying event loop and never got to run, leaking
as ``Task pending ... running at usecase.py:142`` warnings at GC time.

Fix: read the client lazily inside the loop body and ``return`` cleanly
when it's None. A clean exit doesn't invoke the supervisor's restart
path. Also pass ``engine`` to ``_open_listen_connection`` directly so
the listener-setup path doesn't go back through ``self._client`` either.

The warnings only appeared when running all three test files together;
each pair was clean. The trio's longer test stream just changed GC
timing enough to surface what was always a latent bug.
Drop two unreachable lines (the redundant empty-queue guard inside
fetch_with_conn and the dead error_attempt reset that only ran on a
shutdown-driven inner-loop return). Cover the LISTEN fallback paths
with unit tests and replace the brittle pg_stat_activity probe in
test_fetch_uses_persistent_connection with a direct mock.patch.object
of fetch_with_conn that captures pg_backend_pid() per call --
strictly proves the persistent-connection claim and gets line
coverage for free.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@lesnik512 lesnik512 merged commit d774522 into main May 7, 2026
3 checks passed
@lesnik512 lesnik512 deleted the feat/listen-notify-persistent-conn branch May 7, 2026 12:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant