Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr

### User-owned schema

`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ).
`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare the partial index `(queue, next_attempt_at) WHERE acquired_token IS NULL` on the table itself, so Alembic autogenerate brings it up. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ).

### Two-loop subscriber (`subscriber/usecase.py`)

Expand Down
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ The `acquired_token` is critical: a slow handler whose lease expired and was re-

`lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery.

## Recommended index

Add this to your Alembic migration alongside the table:

```sql
CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at)
WHERE acquired_token IS NULL;
```

## Schema validation

Schema validation is opt-in:
Expand Down
30 changes: 17 additions & 13 deletions faststream_outbox/schema.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""
Outbox table factory.

The package does not own the schema. Users attach the returned ``Table`` to their own
``MetaData`` and write Alembic migrations themselves. Recommended companion partial
index (create it in your migration alongside the table)::

CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at)
WHERE acquired_token IS NULL;
The package does not own the schema — users attach the returned ``Table`` to their own
``MetaData`` and write Alembic migrations themselves — but the partial index that the
fetch query relies on **is** declared on the table, so Alembic autogenerate picks it up
and users can't forget it.

A row is "available" iff its lease is unset (``acquired_token IS NULL``) or its lease
is expired (``acquired_at < now() - lease_ttl_seconds``). The fetch query reclaims
Expand Down Expand Up @@ -35,19 +33,16 @@

def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table:
"""
Build the outbox ``Table`` and attach it to *metadata*.
Build the outbox ``Table`` (with the partial fetch index) and attach it to *metadata*.

The user wires the returned table into their own SQLAlchemy ``MetaData`` so it is
discovered by Alembic's autogenerate. They are responsible for the actual migration.

The recommended composite partial index for fetch performance is documented in the
module docstring above; create it explicitly in your migration.
"""
return Table(
table = Table(
table_name,
metadata,
Column("id", BigInteger, primary_key=True, autoincrement=True),
Column("queue", String(255), nullable=False, index=True),
Column("queue", String(255), nullable=False),
Column("payload", LargeBinary, nullable=False),
Column("headers", JSONB, nullable=True),
Column("attempts_count", BigInteger, nullable=False, server_default="0"),
Expand All @@ -58,5 +53,14 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table
Column("last_attempt_at", DateTime(timezone=True), nullable=True),
Column("acquired_at", DateTime(timezone=True), nullable=True),
Column("acquired_token", Uuid, nullable=True),
Index(f"{table_name}_next_attempt_at_idx", "next_attempt_at"),
)
# Partial index that backs the fetch query's hot branch
# (`WHERE acquired_token IS NULL AND queue = ? AND next_attempt_at <= now()`).
# Lease-expired rows fall back to a sequential scan, which is fine — they're rare.
Index(
f"{table_name}_pending_idx",
table.c.queue,
table.c.next_attempt_at,
postgresql_where=table.c.acquired_token.is_(None),
)
return table
11 changes: 6 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ async def pg_engine() -> AsyncIterator[AsyncEngine]:

@pytest.fixture
async def outbox_table(pg_engine: AsyncEngine) -> AsyncIterator[Table]:
"""Per-test outbox table with a unique name + the recommended partial index."""
"""
Per-test outbox table.

The partial fetch index is declared on the Table itself, so ``create_all``
brings it up alongside the table.
"""
metadata = MetaData()
table_name = f"test_outbox_{uuid.uuid4().hex[:12]}"
table = make_outbox_table(metadata, table_name=table_name)
async with pg_engine.begin() as conn:
await conn.run_sync(metadata.create_all)
await conn.exec_driver_sql(
f'CREATE INDEX "{table_name}_pending_idx" ON "{table_name}" '
f"(queue, next_attempt_at) WHERE acquired_token IS NULL"
)
yield table
async with pg_engine.begin() as conn:
await conn.run_sync(metadata.drop_all)
Loading