diff --git a/CLAUDE.md b/CLAUDE.md index c5ec16c..3acc4ed 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project -`faststream-outbox` is a FastStream broker integration that uses a Postgres table as the message queue (transactional outbox pattern). Postgres-only at v0; polling-only (no LISTEN/NOTIFY). +`faststream-outbox` is a FastStream broker integration that uses a Postgres table as the message queue (transactional outbox pattern). Postgres-only at v0. Subscribers poll the table and use LISTEN/NOTIFY to short-circuit idle waits. ## Commands @@ -22,12 +22,22 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr ### Producer side -`broker.publish(body, *, queue, session, headers=None, correlation_id=None)` and `broker.publish_batch(*bodies, queue, session, ...)` insert outbox rows through the caller's `AsyncSession` (`session.execute(insert(table).values(...))`). They do **not** flush, commit, or open their own transaction — the row must commit with the caller's domain writes. Both reject anything that is not an `AsyncSession` with `TypeError`. +`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` and `broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` insert outbox rows through the caller's `AsyncSession`. They do **not** flush, commit, or open their own transaction — the row must commit with the caller's domain writes. Both reject anything that is not an `AsyncSession` with `TypeError`. `publish` returns the inserted row's id (or `None` on `timer_id` conflict); `publish_batch` returns nothing and does not accept `timer_id` (per-row dedup makes no sense in a batch). `broker.request` raises `NotImplementedError` (outbox is fire-and-forget). `OutboxRegistrator.publisher` also raises. The `_NoProducer` stub exists only to satisfy FastStream's broker producer slot. `_encode_payload` (in `envelope.py`) is the internal helper that turns `body` into `(payload_bytes, headers_dict)`. Not exported. +### Timers (delayed delivery) + +`activate_in: timedelta` / `activate_at: datetime` (mutually exclusive) set `next_attempt_at` so the row is invisible to fetch until the gate opens — the `next_attempt_at <= now()` predicate in the fetch CTE is what gates eligibility, so no subscriber-side change is needed for scheduling. For `publish`, `next_attempt_at` is computed server-side via `now() + make_interval(secs => :s)` to stay clock-skew-safe; for `publish_batch` it's client-side (`datetime.now(UTC) + activate_in`) because executemany doesn't compose cleanly with column-level SQL expressions, and the few-ms drift is harmless for user-supplied scheduling. + +`timer_id` (single `publish` only) flows into a `String(255)` column with a partial unique index on `(queue, timer_id) WHERE timer_id IS NOT NULL`. The producer switches to `pg_insert(...).on_conflict_do_nothing(index_elements=[queue, timer_id], index_where=timer_id IS NOT NULL)` so re-publishing the same id is a silent no-op (returns `None`). NOTIFY is skipped when `activate_in`/`activate_at` is set OR the conflict path returned no row — both cases would either wake listeners that find nothing, or wake them prematurely. + +`broker.cancel_timer(*, queue, timer_id, session)` issues `DELETE WHERE queue=? AND timer_id=? AND acquired_token IS NULL` on the caller's session — the `acquired_token IS NULL` guard is load-bearing: it preserves the lease-token invariant by refusing to clobber a row whose handler is already in flight (returns `False` in that case; the delivery completes normally). + +Latency floor: timer firing latency is bounded by `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. NOTIFY does not help here — listeners can't act on a future row. Sub-second precision is not a goal of this broker. + ### User-owned schema `make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare the partial index `(queue, next_attempt_at) WHERE acquired_token IS NULL` on the table itself, so Alembic autogenerate brings it up. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ). @@ -38,7 +48,7 @@ Per subscriber: 1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration. 2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`. -Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_
', queue)` on the caller's session right after the INSERT. NOTIFY is transactional — listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY. +Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_
', queue)` on the caller's session right after the INSERT, **except** when the row is future-dated (`activate_in`/`activate_at` set) or a `timer_id` conflict made the insert a no-op — both cases skip NOTIFY since listeners can't act on the result. NOTIFY is transactional: listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY. Channel naming convention: `outbox_`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling. diff --git a/README.md b/README.md index 12e2292..7b61b39 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,9 @@ async with session_factory() as session, session.begin(): `make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` that you attach to your own `MetaData` and migrate via Alembic. The package does **not** own your schema; it only describes the columns it needs. -`broker.publish(body, *, queue, session, headers=None, correlation_id=None)` inserts one outbox row through the caller's `AsyncSession`. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an `async with session.begin():` block. +`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` inserts one outbox row through the caller's `AsyncSession`. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an `async with session.begin():` block. See [Timers](#timers-delayed-delivery) for `activate_in` / `activate_at` / `timer_id`. -`broker.publish_batch(*bodies, queue, session, headers=None)` inserts many rows in a single round-trip with the same transactional contract. +`broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` inserts many rows in a single round-trip with the same transactional contract. A subscriber owns two async loops: @@ -49,6 +49,40 @@ The `acquired_token` is critical: a slow handler whose lease expired and was re- `lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery. +## Timers (delayed delivery) + +Schedule a publish to fire later by passing `activate_in` (relative) or `activate_at` (absolute, tz-aware) — exactly one. Pass `timer_id` to deduplicate per `(queue, timer_id)`; cancel a not-yet-leased timer with `broker.cancel_timer(...)`. + +```python +import datetime as dt + +# Fire 30 seconds from now, deduplicated by timer_id: +await broker.publish( + {"order_id": 1}, + queue="orders", + session=session, + activate_in=dt.timedelta(seconds=30), + timer_id=f"order-confirm-{order.id}", +) + +# Fire at a specific UTC instant: +await broker.publish( + {"x": 1}, queue="orders", session=session, + activate_at=dt.datetime(2026, 6, 1, 9, tzinfo=dt.UTC), +) + +# Cancel before it fires (no-op if the row is already in flight): +await broker.cancel_timer(queue="orders", timer_id="order-confirm-42", session=session) +``` + +`publish` returns the inserted row's `id`, or `None` if a row with the same `(queue, timer_id)` already exists. `cancel_timer` returns `True` if it deleted the row; `False` means either the timer didn't exist or was already leased to a worker (in which case delivery completes normally). + +`publish_batch` accepts `activate_in` / `activate_at` to schedule every row in the batch identically — but no `timer_id` (per-row dedup makes no sense for a batch). + +**Latency floor:** firing latency is bounded by the subscriber's `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. Lower it for sub-10s precision; sub-second precision is not a goal of this broker. + +**Migration note:** existing deployments must regenerate Alembic migrations after upgrading — the new `timer_id` column and `
_timer_id_uq` partial unique index need to land in the database before publish-with-`timer_id` works. + ## Schema validation Schema validation is opt-in: diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index 5c176e1..2100466 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -6,6 +6,7 @@ owns subscribers on the consumer side. """ +import datetime as _dt import logging import typing from collections.abc import Iterable, Sequence @@ -22,7 +23,8 @@ from faststream._internal.types import BrokerMiddleware, CustomCallable from faststream.specification.schema import BrokerSpec from faststream.specification.schema.extra import Tag, TagDict -from sqlalchemy import insert, text +from sqlalchemy import Float, bindparam, delete, func, insert, text +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from faststream_outbox.client import OutboxClient @@ -186,7 +188,7 @@ async def validate_schema(self) -> None: """Validate the user's table matches what the package expects. Opt-in.""" await self.client.validate_schema() - async def publish( # ty: ignore[invalid-method-override] + async def publish( # ty: ignore[invalid-method-override] # noqa: PLR0913 self, body: typing.Any, *, @@ -194,7 +196,10 @@ async def publish( # ty: ignore[invalid-method-override] session: AsyncSession, headers: dict[str, str] | None = None, correlation_id: str | None = None, - ) -> None: + activate_in: _dt.timedelta | None = None, + activate_at: _dt.datetime | None = None, + timer_id: str | None = None, + ) -> int | None: """ Insert one outbox row using *session*'s open transaction. @@ -202,13 +207,56 @@ async def publish( # ty: ignore[invalid-method-override] ``async with session.begin():`` block). ``publish`` does not flush, commit, or open its own transaction — that is the whole point of the transactional outbox pattern: the row commits atomically with the caller's domain writes. + + Schedule a delayed delivery by passing exactly one of *activate_in* (relative) + or *activate_at* (absolute, tz-aware). Pass *timer_id* to deduplicate per + ``(queue, timer_id)`` — re-publishing with the same id is a no-op (returns + ``None``). Cancel a not-yet-leased timer with :meth:`cancel_timer`. + + Returns the inserted row's ``id`` (BigInt PK), or ``None`` if a timer with + the same ``(queue, timer_id)`` already exists. """ if not isinstance(session, AsyncSession): msg = "broker.publish requires an sqlalchemy.ext.asyncio.AsyncSession" raise TypeError(msg) + if activate_in is not None and activate_at is not None: + msg = "broker.publish accepts at most one of activate_in / activate_at" + raise ValueError(msg) payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id) - await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs)) - await self._notify(session, queue) + t = self._outbox_table + values: dict[str, typing.Any] = {"queue": queue, "payload": payload, "headers": hdrs} + # Server-side compute keeps timing immune to worker/DB clock skew (mirrors + # client.mark_pending_with_lease). + if activate_in is not None: + values["next_attempt_at"] = func.now() + func.make_interval( + 0, 0, 0, 0, 0, 0, bindparam("activate_in_seconds", activate_in.total_seconds(), type_=Float) + ) + elif activate_at is not None: + values["next_attempt_at"] = activate_at + if timer_id is not None: + values["timer_id"] = timer_id + is_future = activate_in is not None or activate_at is not None + + if timer_id is not None: + stmt = ( + pg_insert(t) + .values(**values) + .on_conflict_do_nothing( + index_elements=[t.c.queue, t.c.timer_id], + index_where=t.c.timer_id.is_not(None), + ) + .returning(t.c.id) + ) + else: + stmt = insert(t).values(**values).returning(t.c.id) + + result = await session.execute(stmt) + row_id: int | None = result.scalar() + # Skip NOTIFY for future-dated rows (listeners can't act before the gate + # opens — polling fires them at next tick) and on conflict (no row landed). + if row_id is not None and not is_future: + await self._notify(session, queue) + return row_id async def publish_batch( # ty: ignore[invalid-method-override] self, @@ -216,24 +264,70 @@ async def publish_batch( # ty: ignore[invalid-method-override] queue: str, session: AsyncSession, headers: dict[str, str] | None = None, + activate_in: _dt.timedelta | None = None, + activate_at: _dt.datetime | None = None, ) -> None: """ Insert multiple outbox rows via *session*. Same transactional contract as ``publish``. Each row gets its own auto-generated ``correlation_id``; pass *headers* to - share static headers across all rows. + share static headers across all rows. *activate_in* / *activate_at* schedule + every row in the batch identically — per-row timer dedup is not supported, + use :meth:`publish` for that. """ if not isinstance(session, AsyncSession): msg = "broker.publish_batch requires an sqlalchemy.ext.asyncio.AsyncSession" raise TypeError(msg) + if activate_in is not None and activate_at is not None: + msg = "broker.publish_batch accepts at most one of activate_in / activate_at" + raise ValueError(msg) if not bodies: return + # Client-side time for batch: executemany doesn't compose with column-level + # SQL expressions easily, and a few-ms drift versus the DB is harmless for + # user-supplied scheduling. (Retries still use server time via mark_pending_with_lease.) + next_at: _dt.datetime | None = None + if activate_in is not None: + next_at = _dt.datetime.now(tz=_dt.UTC) + activate_in + elif activate_at is not None: + next_at = activate_at rows = [] for body in bodies: payload, hdrs = _encode_payload(body, headers=headers) - rows.append({"queue": queue, "payload": payload, "headers": hdrs}) + row: dict[str, typing.Any] = {"queue": queue, "payload": payload, "headers": hdrs} + if next_at is not None: + row["next_attempt_at"] = next_at + rows.append(row) await session.execute(insert(self._outbox_table), rows) - await self._notify(session, queue) + if next_at is None: + await self._notify(session, queue) + + async def cancel_timer( + self, + *, + queue: str, + timer_id: str, + session: AsyncSession, + ) -> bool: + """ + Delete a not-yet-leased timer row. Returns True if a row was deleted. + + Same transactional contract as :meth:`publish` — runs on the caller's session + and commits with their transaction. The ``acquired_token IS NULL`` guard + prevents canceling a row whose handler is already in flight: that returns + False and the delivery completes normally. + """ + if not isinstance(session, AsyncSession): + msg = "broker.cancel_timer requires an sqlalchemy.ext.asyncio.AsyncSession" + raise TypeError(msg) + t = self._outbox_table + stmt = delete(t).where( + t.c.queue == queue, + t.c.timer_id == timer_id, + t.c.acquired_token.is_(None), + ) + result = await session.execute(stmt) + return (result.rowcount or 0) > 0 # ty: ignore[unresolved-attribute] async def _notify(self, session: AsyncSession, queue: str) -> None: """ diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py index a41e79b..3c258bd 100644 --- a/faststream_outbox/schema.py +++ b/faststream_outbox/schema.py @@ -53,6 +53,7 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table Column("last_attempt_at", DateTime(timezone=True), nullable=True), Column("acquired_at", DateTime(timezone=True), nullable=True), Column("acquired_token", Uuid, nullable=True), + Column("timer_id", String(255), nullable=True), ) # Partial index that backs the fetch query's hot branch # (`WHERE acquired_token IS NULL AND queue = ? AND next_attempt_at <= now()`). @@ -63,4 +64,14 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table table.c.next_attempt_at, postgresql_where=table.c.acquired_token.is_(None), ) + # Partial unique index that backs `publish(..., timer_id=...)`'s ON CONFLICT DO NOTHING + # and the (queue, timer_id) lookup in `cancel_timer`. Only enforced when timer_id is set, + # so non-timer rows remain unconstrained. + Index( + f"{table_name}_timer_id_uq", + table.c.queue, + table.c.timer_id, + unique=True, + postgresql_where=table.c.timer_id.is_not(None), + ) return table diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 68f9f70..e170804 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -43,6 +43,7 @@ class _FakeRow: last_attempt_at: _dt.datetime | None = None acquired_at: _dt.datetime | None = None acquired_token: uuid.UUID | None = None + timer_id: str | None = None class FakeOutboxClient: @@ -59,13 +60,19 @@ def feed( payload: bytes, headers: dict[str, str] | None = None, next_attempt_at: _dt.datetime | None = None, - ) -> int: + timer_id: str | None = None, + ) -> int | None: + # Mirror the real client's partial-unique-on-(queue, timer_id) behavior: + # re-feeding a timer that already exists is a no-op. + if timer_id is not None and any(r.queue == queue and r.timer_id == timer_id for r in self._rows): + return None row = _FakeRow( id=self._next_id, queue=queue, payload=payload, headers=headers, next_attempt_at=next_attempt_at or _utcnow(), + timer_id=timer_id, ) self._rows.append(row) self._next_id += 1 @@ -153,6 +160,14 @@ async def mark_pending_with_lease( # noqa: PLR0913 return True return False + async def cancel_timer(self, *, queue: str, timer_id: str) -> bool: + """Mirror :meth:`OutboxBroker.cancel_timer` — drop a not-yet-leased timer row.""" + for i, row in enumerate(self._rows): + if row.queue == queue and row.timer_id == timer_id and row.acquired_token is None: + del self._rows[i] + return True + return False + async def validate_schema(self) -> None: return @@ -193,13 +208,15 @@ def feed( *, headers: dict[str, str] | None = None, next_attempt_at: _dt.datetime | None = None, - ) -> int: - """Insert a row directly into the in-memory store. Returns the row id.""" + timer_id: str | None = None, + ) -> int | None: + """Insert a row directly into the in-memory store. Returns the row id, or None on timer_id conflict.""" return self.fake_client.feed( queue=queue, payload=payload, headers=headers, next_attempt_at=next_attempt_at, + timer_id=timer_id, ) @contextmanager diff --git a/tests/test_fake.py b/tests/test_fake.py index a061820..0787207 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -591,3 +591,86 @@ async def test_fake_client_fetch_with_conn_mirrors_fetch() -> None: assert len(rows) == 1 assert rows[0].queue == "q" assert rows[0].payload == b"x" + + +async def test_fake_client_feed_timer_id_dedup() -> None: + fake = FakeOutboxClient() + first = fake.feed(queue="q", payload=b"x", timer_id="email-1") + second = fake.feed(queue="q", payload=b"y", timer_id="email-1") + assert first is not None + assert second is None # second feed is a no-op + assert len(fake.rows) == 1 + assert fake.rows[0].payload == b"x" # first wins + + +async def test_fake_client_feed_timer_id_different_queues_allowed() -> None: + """Unique-on-(queue, timer_id): same timer_id in a different queue is independent.""" + fake = FakeOutboxClient() + a = fake.feed(queue="q1", payload=b"x", timer_id="email-1") + b = fake.feed(queue="q2", payload=b"y", timer_id="email-1") + assert a is not None + assert b is not None + assert len(fake.rows) == 2 + + +async def test_fake_client_future_next_attempt_is_invisible_to_fetch() -> None: + fake = FakeOutboxClient() + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5) + fake.feed(queue="q", payload=b"x", next_attempt_at=future) + rows = await fake.fetch(["q"], limit=10, lease_ttl_seconds=60.0) + assert rows == [] + + +async def test_fake_client_cancel_timer_removes_unleased_row() -> None: + fake = FakeOutboxClient() + fake.feed(queue="q", payload=b"x", timer_id="email-1") + assert await fake.cancel_timer(queue="q", timer_id="email-1") is True + assert fake.rows == [] + + +async def test_fake_client_cancel_timer_unknown_returns_false() -> None: + fake = FakeOutboxClient() + assert await fake.cancel_timer(queue="q", timer_id="never-existed") is False + + +async def test_fake_client_cancel_timer_skips_leased_row() -> None: + fake = FakeOutboxClient() + fake.feed(queue="q", payload=b"x", timer_id="email-1") + # Simulate the row having been claimed by a worker. + fake.rows[0].acquired_token = uuid.uuid4() + fake.rows[0].acquired_at = _dt.datetime.now(tz=_dt.UTC) + assert await fake.cancel_timer(queue="q", timer_id="email-1") is False + assert len(fake.rows) == 1 # row still there + + +async def test_test_broker_feed_forwards_timer_id() -> None: + broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + first = test_broker.feed("orders", b"x", timer_id="email-1") + second = test_broker.feed("orders", b"y", timer_id="email-1") + assert first is not None + assert second is None + assert len(test_broker.fake_client.rows) == 1 + assert test_broker.fake_client.rows[0].timer_id == "email-1" + + +async def test_fake_broker_delays_delivery_by_next_attempt_at() -> None: + """Row fed with next_attempt_at=future should not be dispatched until the gate opens.""" + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) + test_broker.feed("orders", b'"delayed"', next_attempt_at=future, headers={"content-type": "application/json"}) + # Before the gate opens: nothing delivered. + await asyncio.sleep(0.1) + assert received == [] + # After the gate opens (and at least one fetch tick): delivered. + await _wait_until(lambda: received, timeout=2.0) + assert received == ["delayed"] diff --git a/tests/test_integration.py b/tests/test_integration.py index 208a778..e2dbdfd 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -365,6 +365,208 @@ async def handle(body: dict) -> None: assert len(set(fetch_pids)) == 1, f"persistent connection should hold one pid, saw {set(fetch_pids)}" +async def test_publish_with_activate_in_delays_delivery(pg_engine, outbox_table) -> None: + """Handler must not see the message until activate_in elapses.""" + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + + @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2) + async def handle(body: dict) -> None: + received.append(body) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + async with session_factory() as session, session.begin(): + await broker.publish( + {"order_id": 1}, + queue="orders", + session=session, + activate_in=_dt.timedelta(milliseconds=500), + ) + # Before the gate opens: nothing delivered. + await asyncio.sleep(0.2) + assert received == [] + # After the gate opens: delivered. + await _wait_until(lambda: received, timeout=3.0) + assert received == [{"order_id": 1}] + + +async def test_publish_with_activate_at_delays_delivery(pg_engine, outbox_table) -> None: + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + + @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2) + async def handle(body: dict) -> None: + received.append(body) + + fire_at = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=500) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + async with session_factory() as session, session.begin(): + await broker.publish( + {"order_id": 2}, + queue="orders", + session=session, + activate_at=fire_at, + ) + await asyncio.sleep(0.2) + assert received == [] + await _wait_until(lambda: received, timeout=3.0) + assert received == [{"order_id": 2}] + + +async def test_publish_rejects_activate_in_and_at_together(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + with pytest.raises(ValueError, match="activate_in / activate_at"): + await broker.publish( + {"x": 1}, + queue="orders", + session=session, + activate_in=_dt.timedelta(seconds=1), + activate_at=_dt.datetime.now(tz=_dt.UTC), + ) + + +async def test_publish_with_timer_id_dedups(pg_engine, outbox_table) -> None: + """Re-publishing the same (queue, timer_id) is a no-op — handler invoked exactly once.""" + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + + @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2) + async def handle(body: dict) -> None: + received.append(body) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + async with session_factory() as session, session.begin(): + first_id = await broker.publish( + {"v": 1}, + queue="orders", + session=session, + timer_id="email-1", + ) + async with session_factory() as session, session.begin(): + second_id = await broker.publish( + {"v": 2}, + queue="orders", + session=session, + timer_id="email-1", + ) + await _wait_until(lambda: received, timeout=3.0) + + assert first_id is not None + assert second_id is None # second insert was a no-op + assert received == [{"v": 1}] # second body was never delivered + + +async def test_publish_timer_id_distinct_queues_are_independent(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + a = await broker.publish({"x": 1}, queue="q1", session=session, timer_id="dup") + b = await broker.publish({"x": 2}, queue="q2", session=session, timer_id="dup") + assert a is not None + assert b is not None + assert await _row_count(pg_engine, outbox_table) == 2 + + +async def test_cancel_timer_before_fire_prevents_delivery(pg_engine, outbox_table) -> None: + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + + @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2) + async def handle(body: dict) -> None: + received.append(body) # pragma: no cover # cancellation must prevent this + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + async with session_factory() as session, session.begin(): + await broker.publish( + {"order_id": 9}, + queue="orders", + session=session, + activate_in=_dt.timedelta(seconds=1), + timer_id="email-cancel", + ) + # Cancel before activate_in elapses. + async with session_factory() as session, session.begin(): + cancelled = await broker.cancel_timer(queue="orders", timer_id="email-cancel", session=session) + assert cancelled is True + # Wait past the original fire time — handler must never see the row. + await asyncio.sleep(1.5) + assert received == [] + assert await _row_count(pg_engine, outbox_table) == 0 + + +async def test_cancel_timer_after_lease_taken_returns_false(pg_engine, outbox_table) -> None: + """If the row's already in flight, cancel is a no-op and delivery completes.""" + received: list[dict] = [] + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + handler_started = asyncio.Event() + release_handler = asyncio.Event() + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: dict) -> None: + handler_started.set() + await release_handler.wait() + received.append(body) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + async with session_factory() as session, session.begin(): + await broker.publish( + {"v": "in-flight"}, + queue="orders", + session=session, + timer_id="cant-cancel", + ) + # Wait for the handler to enter; the row is now leased. + await asyncio.wait_for(handler_started.wait(), timeout=3.0) + async with session_factory() as session, session.begin(): + cancelled = await broker.cancel_timer(queue="orders", timer_id="cant-cancel", session=session) + assert cancelled is False # lease guard prevented the DELETE + release_handler.set() + await _wait_until(lambda: received, timeout=3.0) + assert received == [{"v": "in-flight"}] + + +async def test_cancel_timer_unknown_returns_false(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + cancelled = await broker.cancel_timer(queue="orders", timer_id="nope", session=session) + assert cancelled is False + + +async def test_publish_returns_inserted_row_id(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + row_id = await broker.publish({"x": 1}, queue="orders", session=session) + assert isinstance(row_id, int) + assert row_id > 0 + + +async def test_publish_batch_with_activate_in_delays_all_rows(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish_batch( + {"i": 1}, + {"i": 2}, + {"i": 3}, + queue="orders", + session=session, + activate_in=_dt.timedelta(minutes=10), # well past any test horizon + ) + # Rows are inserted but invisible to fetch (next_attempt_at in the future). + assert await _row_count(pg_engine, outbox_table) == 3 + client = OutboxClient(pg_engine, outbox_table) + assert await client.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) == [] + + async def test_notify_payload_carries_queue_name(pg_engine, outbox_table) -> None: """The NOTIFY payload Postgres delivers to LISTEN clients must equal the queue name.""" received_payloads: list[str] = [] diff --git a/tests/test_unit.py b/tests/test_unit.py index 19b390e..f45b56a 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -4,6 +4,8 @@ import pytest from sqlalchemy import MetaData +from sqlalchemy.dialects import postgresql +from sqlalchemy.ext.asyncio import AsyncSession from faststream_outbox import ( ConstantRetry, @@ -29,6 +31,20 @@ def _make_broker(engine: object | None = None, table_name: str = "outbox") -> Ou return OutboxBroker(outbox_table=table) +def _make_session_mock(*, scalar_return: object = 42) -> AsyncMock: + """ + Build an AsyncSession mock whose ``execute()`` returns a sync MagicMock. + + AsyncMock(spec=AsyncSession) makes the return_value of execute() default to an + AsyncMock — so ``result.scalar()`` would itself return a coroutine. The broker's + real CursorResult.scalar() is sync, so override the return_value to a MagicMock. + """ + session = AsyncMock(spec=AsyncSession) + session.execute.return_value = MagicMock() + session.execute.return_value.scalar.return_value = scalar_return + return session + + # --- make_outbox_table --- @@ -48,11 +64,22 @@ def test_make_outbox_table_columns_present() -> None: "last_attempt_at", "acquired_at", "acquired_token", + "timer_id", } assert {c.name for c in t.columns} == expected assert t.name == "my_outbox" +def test_make_outbox_table_declares_timer_unique_index() -> None: + metadata = MetaData() + t = make_outbox_table(metadata, table_name="my_outbox") + timer_idx = next(idx for idx in t.indexes if idx.name == "my_outbox_timer_id_uq") + assert timer_idx.unique is True + assert [c.name for c in timer_idx.columns] == ["queue", "timer_id"] + # Partial-index predicate ensures non-timer rows aren't constrained + assert timer_idx.dialect_options["postgresql"]["where"] is not None + + def test_make_outbox_table_attaches_to_metadata() -> None: metadata = MetaData() t = make_outbox_table(metadata, table_name="outbox") @@ -276,10 +303,8 @@ async def test_broker_publish_rejects_non_async_session() -> None: async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> None: - from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415 - broker = _make_broker() - session = AsyncMock(spec=AsyncSession) + session = _make_session_mock() await broker.publish({"order_id": 1}, queue="orders", session=session) # Two execute calls: the INSERT, then SELECT pg_notify(...). assert session.execute.await_count == 2 @@ -296,10 +321,8 @@ async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> Non async def test_broker_publish_does_not_commit() -> None: - from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415 - broker = _make_broker() - session = AsyncMock(spec=AsyncSession) + session = _make_session_mock() await broker.publish(b"x", queue="orders", session=session) session.commit.assert_not_called() session.flush.assert_not_called() @@ -373,6 +396,145 @@ async def test_broker_publish_batch_no_bodies_is_noop() -> None: session.execute.assert_not_called() +async def test_broker_publish_rejects_activate_in_and_at_together() -> None: + broker = _make_broker() + session = _make_session_mock() + with pytest.raises(ValueError, match="activate_in / activate_at"): + await broker.publish( + b"x", + queue="orders", + session=session, + activate_in=_dt.timedelta(seconds=1), + activate_at=_dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=1), + ) + + +async def test_broker_publish_with_activate_in_skips_notify() -> None: + broker = _make_broker() + session = _make_session_mock() + await broker.publish(b"x", queue="orders", session=session, activate_in=_dt.timedelta(seconds=30)) + # Only the INSERT — no NOTIFY for future-dated rows. + assert session.execute.await_count == 1 + insert_stmt = session.execute.await_args_list[0].args[0] + assert "INSERT INTO" in str(insert_stmt) + assert "next_attempt_at" in str(insert_stmt) + + +async def test_broker_publish_with_activate_at_skips_notify() -> None: + broker = _make_broker() + session = _make_session_mock() + fire = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5) + await broker.publish(b"x", queue="orders", session=session, activate_at=fire) + assert session.execute.await_count == 1 + params = session.execute.await_args_list[0].args[0].compile().params + assert params["next_attempt_at"] == fire + + +async def test_broker_publish_with_timer_id_uses_on_conflict() -> None: + broker = _make_broker() + session = _make_session_mock() + await broker.publish(b"x", queue="orders", session=session, timer_id="email-123") + insert_stmt = session.execute.await_args_list[0].args[0] + compiled = insert_stmt.compile(dialect=postgresql.dialect()) + sql = str(compiled) + assert "INSERT INTO" in sql + assert "ON CONFLICT" in sql + assert "DO NOTHING" in sql + assert compiled.params["timer_id"] == "email-123" + + +async def test_broker_publish_returns_none_on_timer_id_conflict() -> None: + broker = _make_broker() + # Simulate ON CONFLICT DO NOTHING returning no rows: scalar() → None + session = _make_session_mock(scalar_return=None) + result = await broker.publish(b"x", queue="orders", session=session, timer_id="dup") + assert result is None + # NOTIFY skipped when nothing was inserted. + assert session.execute.await_count == 1 + + +async def test_broker_publish_batch_rejects_activate_in_and_at_together() -> None: + broker = _make_broker() + session = AsyncMock(spec=AsyncSession) + with pytest.raises(ValueError, match="activate_in / activate_at"): + await broker.publish_batch( + b"a", + queue="orders", + session=session, + activate_in=_dt.timedelta(seconds=1), + activate_at=_dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=1), + ) + + +async def test_broker_publish_batch_with_activate_in_skips_notify() -> None: + broker = _make_broker() + session = AsyncMock(spec=AsyncSession) + await broker.publish_batch( + b"a", + b"b", + queue="orders", + session=session, + activate_in=_dt.timedelta(seconds=30), + ) + # Insert only — no NOTIFY for future-dated batch. + assert session.execute.await_count == 1 + rows = session.execute.await_args_list[0].args[1] + assert all("next_attempt_at" in r for r in rows) + + +async def test_broker_publish_batch_with_activate_at_skips_notify() -> None: + broker = _make_broker() + session = AsyncMock(spec=AsyncSession) + fire = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5) + await broker.publish_batch(b"a", b"b", queue="orders", session=session, activate_at=fire) + # No NOTIFY: future-dated rows. + assert session.execute.await_count == 1 + rows = session.execute.await_args_list[0].args[1] + assert all(r["next_attempt_at"] == fire for r in rows) + + +async def test_broker_publish_batch_does_not_accept_timer_id() -> None: + """publish_batch must not expose per-row dedup; timer_id is a publish()-only kwarg.""" + broker = _make_broker() + session = AsyncMock(spec=AsyncSession) + with pytest.raises(TypeError, match="timer_id"): + await broker.publish_batch( + b"a", + queue="orders", + session=session, + timer_id="x", # ty: ignore[unknown-argument] + ) + + +async def test_broker_cancel_timer_rejects_non_async_session() -> None: + broker = _make_broker() + with pytest.raises(TypeError, match="AsyncSession"): + await broker.cancel_timer(queue="orders", timer_id="x", session=object()) # ty: ignore[invalid-argument-type] + + +async def test_broker_cancel_timer_emits_delete_with_lease_guard() -> None: + broker = _make_broker() + session = AsyncMock(spec=AsyncSession) + session.execute.return_value.rowcount = 1 + deleted = await broker.cancel_timer(queue="orders", timer_id="email-1", session=session) + assert deleted is True + delete_stmt = session.execute.await_args_list[0].args[0] + sql = str(delete_stmt) + assert "DELETE" in sql + assert "acquired_token IS NULL" in sql + params = delete_stmt.compile().params + assert params["queue_1"] == "orders" + assert params["timer_id_1"] == "email-1" + + +async def test_broker_cancel_timer_returns_false_when_nothing_deleted() -> None: + broker = _make_broker() + session = AsyncMock(spec=AsyncSession) + session.execute.return_value.rowcount = 0 + deleted = await broker.cancel_timer(queue="orders", timer_id="x", session=session) + assert deleted is False + + async def test_broker_publish_batch_executes_single_insert_for_many_rows() -> None: from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415