feat: persistent fetch connection + LISTEN/NOTIFY wakeups#4
Merged
Conversation
Two perf items collapse into one architectural change. The subscriber's fetch loop now owns: 1. A long-lived AsyncConnection used for the fetch CTE — replaces the per-call ``async with engine.begin()`` pool checkout. 2. A separate raw asyncpg connection running ``LISTEN outbox_<table>``, driven via asyncpg's native ``add_listener``. A dedicated connection is required because asyncpg's listener task monopolizes its socket; mixing transactional queries on the same connection would break notification delivery. ``broker.publish`` and ``publish_batch`` now emit ``SELECT pg_notify( 'outbox_<table>', queue)`` after the INSERT, on the caller's session. NOTIFY is transactional in Postgres, so the listener only sees it after the user's transaction commits — atomicity with the row insert is automatic, and rollbacks silently drop the NOTIFY. The fetch loop's idle-sleep is short-circuited by an asyncio.Event the LISTEN callback sets. Idle dispatch latency drops from up to ``max_fetch_interval`` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error, network blip), the subscriber logs once and degrades gracefully to today's polling behavior. On any DB error inside the fetch loop, both connections are closed, the loop backs off exponentially, then both reopen. Test broker (FakeOutboxClient) signals "no real engine" by returning None from ``client.engine``; the subscriber takes a polling-only path and never opens persistent connections or LISTENs. Existing fake tests are untouched. New integration tests cover NOTIFY-driven wakeup, NOTIFY payload contents, and persistent-connection reuse. Pool sizing implication for users: each active subscriber holds 2 connections (1 from the SQLAlchemy pool, 1 raw asyncpg), so size ``pool_size >= num_subscribers + max_workers`` (raw asyncpg conn is outside the pool).
After the LISTEN/NOTIFY refactor, ``_fetch_loop`` started with ``engine = self._client.engine``. The ``self._client`` property raises RuntimeError when the broker has no client — which happens during TestOutboxBroker teardown, when ``mock.patch.object(broker, "client", ...)`` reverses and ``broker.config.broker_config.client`` goes back to None. FastStream's ``TaskCallbackSupervisor`` (subscriber/supervisor.py) auto-restarts any task whose coroutine raises a non-CancelledError exception. So the dying ``_fetch_loop`` task triggered a fresh ``add_task`` from the supervisor's done callback. The new task was created against the dying event loop and never got to run, leaking as ``Task pending ... running at usecase.py:142`` warnings at GC time. Fix: read the client lazily inside the loop body and ``return`` cleanly when it's None. A clean exit doesn't invoke the supervisor's restart path. Also pass ``engine`` to ``_open_listen_connection`` directly so the listener-setup path doesn't go back through ``self._client`` either. The warnings only appeared when running all three test files together; each pair was clean. The trio's longer test stream just changed GC timing enough to surface what was always a latent bug.
Drop two unreachable lines (the redundant empty-queue guard inside fetch_with_conn and the dead error_attempt reset that only ran on a shutdown-driven inner-loop return). Cover the LISTEN fallback paths with unit tests and replace the brittle pg_stat_activity probe in test_fetch_uses_persistent_connection with a direct mock.patch.object of fetch_with_conn that captures pg_backend_pid() per call -- strictly proves the persistent-connection claim and gets line coverage for free. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Two perf items collapse into one architectural change. The subscriber's fetch loop now owns:
async with engine.begin()pool checkout.LISTEN outbox_<table>, driven via asyncpg's nativeadd_listener. A dedicated connection is required because asyncpg's listener task monopolizes its socket; mixing transactional queries on the same connection would break notification delivery.broker.publishandpublish_batchnow emitSELECT pg_notify( 'outbox_<table>', queue)after the INSERT, on the caller's session. NOTIFY is transactional in Postgres, so the listener only sees it after the user's transaction commits — atomicity with the row insert is automatic, and rollbacks silently drop the NOTIFY.The fetch loop's idle-sleep is short-circuited by an asyncio.Event the LISTEN callback sets. Idle dispatch latency drops from up to
max_fetch_interval(default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error, network blip), the subscriber logs once and degrades gracefully to today's polling behavior. On any DB error inside the fetch loop, both connections are closed, the loop backs off exponentially, then both reopen.Test broker (FakeOutboxClient) signals "no real engine" by returning None from
client.engine; the subscriber takes a polling-only path and never opens persistent connections or LISTENs. Existing fake tests are untouched. New integration tests cover NOTIFY-driven wakeup, NOTIFY payload contents, and persistent-connection reuse.Pool sizing implication for users: each active subscriber holds 2 connections (1 from the SQLAlchemy pool, 1 raw asyncpg), so size
pool_size >= num_subscribers + max_workers(raw asyncpg conn is outside the pool).