From f037d19af37d84a0ddaa21c91da9ed15a4f6afc0 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 13:30:12 +0300 Subject: [PATCH 1/2] perf: declare fetch partial index inside make_outbox_table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fetch query's hot branch (`WHERE acquired_token IS NULL AND queue = ? AND next_attempt_at <= now()`) needs a composite partial index to avoid a sequential scan. Until now this was a recommendation in the README that users had to add to their Alembic migration manually — easy to forget, silently catastrophic for throughput. Declare the index on the Table itself so Alembic autogenerate brings it up alongside the table: CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at) WHERE acquired_token IS NULL; Drop the now-redundant standalone indexes: - `queue` (`index=True` on the column) — covered by the composite. - `outbox_next_attempt_at_idx` — same. Lease-expired rows fall back to a sequential scan when fetch reclaims them; that's fine because expired leases are rare. --- CLAUDE.md | 2 +- README.md | 6 ++++-- faststream_outbox/schema.py | 30 +++++++++++++++++------------- tests/conftest.py | 11 ++++++----- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index afc6767..ae992a3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,7 +30,7 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr ### User-owned schema -`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ). +`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare the partial index `(queue, next_attempt_at) WHERE acquired_token IS NULL` on the table itself, so Alembic autogenerate brings it up. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ). ### Two-loop subscriber (`subscriber/usecase.py`) diff --git a/README.md b/README.md index 98896e9..95d632f 100644 --- a/README.md +++ b/README.md @@ -49,15 +49,17 @@ The `acquired_token` is critical: a slow handler whose lease expired and was re- `lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery. -## Recommended index +## Index -Add this to your Alembic migration alongside the table: +`make_outbox_table` declares the partial index that the fetch query relies on: ```sql CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at) WHERE acquired_token IS NULL; ``` +Alembic autogenerate picks it up alongside the table — no separate migration step needed. Lease-expired rows fall back to a sequential scan when the fetch query reclaims them, which is fine because expired leases are rare. + ## Schema validation Schema validation is opt-in: diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py index 7134a17..a41e79b 100644 --- a/faststream_outbox/schema.py +++ b/faststream_outbox/schema.py @@ -1,12 +1,10 @@ """ Outbox table factory. -The package does not own the schema. Users attach the returned ``Table`` to their own -``MetaData`` and write Alembic migrations themselves. Recommended companion partial -index (create it in your migration alongside the table):: - - CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at) - WHERE acquired_token IS NULL; +The package does not own the schema — users attach the returned ``Table`` to their own +``MetaData`` and write Alembic migrations themselves — but the partial index that the +fetch query relies on **is** declared on the table, so Alembic autogenerate picks it up +and users can't forget it. A row is "available" iff its lease is unset (``acquired_token IS NULL``) or its lease is expired (``acquired_at < now() - lease_ttl_seconds``). The fetch query reclaims @@ -35,19 +33,16 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table: """ - Build the outbox ``Table`` and attach it to *metadata*. + Build the outbox ``Table`` (with the partial fetch index) and attach it to *metadata*. The user wires the returned table into their own SQLAlchemy ``MetaData`` so it is discovered by Alembic's autogenerate. They are responsible for the actual migration. - - The recommended composite partial index for fetch performance is documented in the - module docstring above; create it explicitly in your migration. """ - return Table( + table = Table( table_name, metadata, Column("id", BigInteger, primary_key=True, autoincrement=True), - Column("queue", String(255), nullable=False, index=True), + Column("queue", String(255), nullable=False), Column("payload", LargeBinary, nullable=False), Column("headers", JSONB, nullable=True), Column("attempts_count", BigInteger, nullable=False, server_default="0"), @@ -58,5 +53,14 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table Column("last_attempt_at", DateTime(timezone=True), nullable=True), Column("acquired_at", DateTime(timezone=True), nullable=True), Column("acquired_token", Uuid, nullable=True), - Index(f"{table_name}_next_attempt_at_idx", "next_attempt_at"), ) + # Partial index that backs the fetch query's hot branch + # (`WHERE acquired_token IS NULL AND queue = ? AND next_attempt_at <= now()`). + # Lease-expired rows fall back to a sequential scan, which is fine — they're rare. + Index( + f"{table_name}_pending_idx", + table.c.queue, + table.c.next_attempt_at, + postgresql_where=table.c.acquired_token.is_(None), + ) + return table diff --git a/tests/conftest.py b/tests/conftest.py index 84d5c8f..3bfc8c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,16 +27,17 @@ async def pg_engine() -> AsyncIterator[AsyncEngine]: @pytest.fixture async def outbox_table(pg_engine: AsyncEngine) -> AsyncIterator[Table]: - """Per-test outbox table with a unique name + the recommended partial index.""" + """ + Per-test outbox table. + + The partial fetch index is declared on the Table itself, so ``create_all`` + brings it up alongside the table. + """ metadata = MetaData() table_name = f"test_outbox_{uuid.uuid4().hex[:12]}" table = make_outbox_table(metadata, table_name=table_name) async with pg_engine.begin() as conn: await conn.run_sync(metadata.create_all) - await conn.exec_driver_sql( - f'CREATE INDEX "{table_name}_pending_idx" ON "{table_name}" ' - f"(queue, next_attempt_at) WHERE acquired_token IS NULL" - ) yield table async with pg_engine.begin() as conn: await conn.run_sync(metadata.drop_all) From c3d0b80645235ccd71e11c19106e9f8afd9c7500 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 7 May 2026 13:35:52 +0300 Subject: [PATCH 2/2] docs: drop index section from README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The partial index is declared inside make_outbox_table and brought up by Alembic autogenerate — users don't need to know it exists. --- README.md | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/README.md b/README.md index 95d632f..85e0be6 100644 --- a/README.md +++ b/README.md @@ -49,17 +49,6 @@ The `acquired_token` is critical: a slow handler whose lease expired and was re- `lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery. -## Index - -`make_outbox_table` declares the partial index that the fetch query relies on: - -```sql -CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at) - WHERE acquired_token IS NULL; -``` - -Alembic autogenerate picks it up alongside the table — no separate migration step needed. Lease-expired rows fall back to a sequential scan when the fetch query reclaims them, which is fine because expired leases are rare. - ## Schema validation Schema validation is opt-in: