diff --git a/CLAUDE.md b/CLAUDE.md
index c5ec16c..3acc4ed 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Project
-`faststream-outbox` is a FastStream broker integration that uses a Postgres table as the message queue (transactional outbox pattern). Postgres-only at v0; polling-only (no LISTEN/NOTIFY).
+`faststream-outbox` is a FastStream broker integration that uses a Postgres table as the message queue (transactional outbox pattern). Postgres-only at v0. Subscribers poll the table and use LISTEN/NOTIFY to short-circuit idle waits.
## Commands
@@ -22,12 +22,22 @@ The package wires a FastStream `Broker`/`Registrator`/`Subscriber` trio whose tr
### Producer side
-`broker.publish(body, *, queue, session, headers=None, correlation_id=None)` and `broker.publish_batch(*bodies, queue, session, ...)` insert outbox rows through the caller's `AsyncSession` (`session.execute(insert(table).values(...))`). They do **not** flush, commit, or open their own transaction — the row must commit with the caller's domain writes. Both reject anything that is not an `AsyncSession` with `TypeError`.
+`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` and `broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` insert outbox rows through the caller's `AsyncSession`. They do **not** flush, commit, or open their own transaction — the row must commit with the caller's domain writes. Both reject anything that is not an `AsyncSession` with `TypeError`. `publish` returns the inserted row's id (or `None` on `timer_id` conflict); `publish_batch` returns nothing and does not accept `timer_id` (per-row dedup makes no sense in a batch).
`broker.request` raises `NotImplementedError` (outbox is fire-and-forget). `OutboxRegistrator.publisher` also raises. The `_NoProducer` stub exists only to satisfy FastStream's broker producer slot.
`_encode_payload` (in `envelope.py`) is the internal helper that turns `body` into `(payload_bytes, headers_dict)`. Not exported.
+### Timers (delayed delivery)
+
+`activate_in: timedelta` / `activate_at: datetime` (mutually exclusive) set `next_attempt_at` so the row is invisible to fetch until the gate opens — the `next_attempt_at <= now()` predicate in the fetch CTE is what gates eligibility, so no subscriber-side change is needed for scheduling. For `publish`, `next_attempt_at` is computed server-side via `now() + make_interval(secs => :s)` to stay clock-skew-safe; for `publish_batch` it's client-side (`datetime.now(UTC) + activate_in`) because executemany doesn't compose cleanly with column-level SQL expressions, and the few-ms drift is harmless for user-supplied scheduling.
+
+`timer_id` (single `publish` only) flows into a `String(255)` column with a partial unique index on `(queue, timer_id) WHERE timer_id IS NOT NULL`. The producer switches to `pg_insert(...).on_conflict_do_nothing(index_elements=[queue, timer_id], index_where=timer_id IS NOT NULL)` so re-publishing the same id is a silent no-op (returns `None`). NOTIFY is skipped when `activate_in`/`activate_at` is set OR the conflict path returned no row — both cases would either wake listeners that find nothing, or wake them prematurely.
+
+`broker.cancel_timer(*, queue, timer_id, session)` issues `DELETE WHERE queue=? AND timer_id=? AND acquired_token IS NULL` on the caller's session — the `acquired_token IS NULL` guard is load-bearing: it preserves the lease-token invariant by refusing to clobber a row whose handler is already in flight (returns `False` in that case; the delivery completes normally).
+
+Latency floor: timer firing latency is bounded by `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. NOTIFY does not help here — listeners can't act on a future row. Sub-second precision is not a goal of this broker.
+
### User-owned schema
`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare the partial index `(queue, next_attempt_at) WHERE acquired_token IS NULL` on the table itself, so Alembic autogenerate brings it up. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ).
@@ -38,7 +48,7 @@ Per subscriber:
1. **`_fetch_loop`** — owns a long-lived `AsyncConnection` for the fetch CTE and a separate raw asyncpg connection for `LISTEN outbox_
`. Single CTE: `SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`. The CTE's WHERE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - make_interval(secs => :lease_ttl)`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — idle dispatch latency drops from up to `max_fetch_interval` (default 10s) to ~10ms. If LISTEN setup fails (asyncpg missing, non-asyncpg driver, permission error), the loop logs once and falls back to polling. On any DB error the connections are closed, the loop backs off exponentially (capped by `_BACKOFF_EXP_CAP=30`), and reopens. Test broker (no real engine) skips the persistent-connection / LISTEN path entirely and uses `client.fetch(...)` per iteration.
2. **`_worker_loop`** × `max_workers` — pulls from an in-process `asyncio.Queue(maxsize=fetch_batch_size)`, dispatches via `consume()`, then flushes the row's terminal state. Default `AckPolicy.NACK_ON_ERROR`.
-Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_', queue)` on the caller's session right after the INSERT. NOTIFY is transactional — listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY.
+Producer side: `broker.publish` and `publish_batch` emit `SELECT pg_notify('outbox_', queue)` on the caller's session right after the INSERT, **except** when the row is future-dated (`activate_in`/`activate_at` set) or a `timer_id` conflict made the insert a no-op — both cases skip NOTIFY since listeners can't act on the result. NOTIFY is transactional: listeners only see it after the user's transaction commits, so atomicity with the row insert is automatic. Rolled-back transactions silently drop the NOTIFY.
Channel naming convention: `outbox_`. Postgres limits identifiers to 63 chars, so users with table names longer than ~56 chars will silently lose the NOTIFY wake-up and degrade to polling.
diff --git a/README.md b/README.md
index 12e2292..7b61b39 100644
--- a/README.md
+++ b/README.md
@@ -36,9 +36,9 @@ async with session_factory() as session, session.begin():
`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` that you attach to your own `MetaData` and migrate via Alembic. The package does **not** own your schema; it only describes the columns it needs.
-`broker.publish(body, *, queue, session, headers=None, correlation_id=None)` inserts one outbox row through the caller's `AsyncSession`. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an `async with session.begin():` block.
+`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` inserts one outbox row through the caller's `AsyncSession`. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an `async with session.begin():` block. See [Timers](#timers-delayed-delivery) for `activate_in` / `activate_at` / `timer_id`.
-`broker.publish_batch(*bodies, queue, session, headers=None)` inserts many rows in a single round-trip with the same transactional contract.
+`broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` inserts many rows in a single round-trip with the same transactional contract.
A subscriber owns two async loops:
@@ -49,6 +49,40 @@ The `acquired_token` is critical: a slow handler whose lease expired and was re-
`lease_ttl_seconds` (default `60.0`) **must exceed your handler's P99 duration with margin** — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery.
+## Timers (delayed delivery)
+
+Schedule a publish to fire later by passing `activate_in` (relative) or `activate_at` (absolute, tz-aware) — exactly one. Pass `timer_id` to deduplicate per `(queue, timer_id)`; cancel a not-yet-leased timer with `broker.cancel_timer(...)`.
+
+```python
+import datetime as dt
+
+# Fire 30 seconds from now, deduplicated by timer_id:
+await broker.publish(
+ {"order_id": 1},
+ queue="orders",
+ session=session,
+ activate_in=dt.timedelta(seconds=30),
+ timer_id=f"order-confirm-{order.id}",
+)
+
+# Fire at a specific UTC instant:
+await broker.publish(
+ {"x": 1}, queue="orders", session=session,
+ activate_at=dt.datetime(2026, 6, 1, 9, tzinfo=dt.UTC),
+)
+
+# Cancel before it fires (no-op if the row is already in flight):
+await broker.cancel_timer(queue="orders", timer_id="order-confirm-42", session=session)
+```
+
+`publish` returns the inserted row's `id`, or `None` if a row with the same `(queue, timer_id)` already exists. `cancel_timer` returns `True` if it deleted the row; `False` means either the timer didn't exist or was already leased to a worker (in which case delivery completes normally).
+
+`publish_batch` accepts `activate_in` / `activate_at` to schedule every row in the batch identically — but no `timer_id` (per-row dedup makes no sense for a batch).
+
+**Latency floor:** firing latency is bounded by the subscriber's `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. Lower it for sub-10s precision; sub-second precision is not a goal of this broker.
+
+**Migration note:** existing deployments must regenerate Alembic migrations after upgrading — the new `timer_id` column and `_timer_id_uq` partial unique index need to land in the database before publish-with-`timer_id` works.
+
## Schema validation
Schema validation is opt-in:
diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py
index 5c176e1..2100466 100644
--- a/faststream_outbox/broker.py
+++ b/faststream_outbox/broker.py
@@ -6,6 +6,7 @@
owns subscribers on the consumer side.
"""
+import datetime as _dt
import logging
import typing
from collections.abc import Iterable, Sequence
@@ -22,7 +23,8 @@
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
-from sqlalchemy import insert, text
+from sqlalchemy import Float, bindparam, delete, func, insert, text
+from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from faststream_outbox.client import OutboxClient
@@ -186,7 +188,7 @@ async def validate_schema(self) -> None:
"""Validate the user's table matches what the package expects. Opt-in."""
await self.client.validate_schema()
- async def publish( # ty: ignore[invalid-method-override]
+ async def publish( # ty: ignore[invalid-method-override] # noqa: PLR0913
self,
body: typing.Any,
*,
@@ -194,7 +196,10 @@ async def publish( # ty: ignore[invalid-method-override]
session: AsyncSession,
headers: dict[str, str] | None = None,
correlation_id: str | None = None,
- ) -> None:
+ activate_in: _dt.timedelta | None = None,
+ activate_at: _dt.datetime | None = None,
+ timer_id: str | None = None,
+ ) -> int | None:
"""
Insert one outbox row using *session*'s open transaction.
@@ -202,13 +207,56 @@ async def publish( # ty: ignore[invalid-method-override]
``async with session.begin():`` block). ``publish`` does not flush, commit,
or open its own transaction — that is the whole point of the transactional
outbox pattern: the row commits atomically with the caller's domain writes.
+
+ Schedule a delayed delivery by passing exactly one of *activate_in* (relative)
+ or *activate_at* (absolute, tz-aware). Pass *timer_id* to deduplicate per
+ ``(queue, timer_id)`` — re-publishing with the same id is a no-op (returns
+ ``None``). Cancel a not-yet-leased timer with :meth:`cancel_timer`.
+
+ Returns the inserted row's ``id`` (BigInt PK), or ``None`` if a timer with
+ the same ``(queue, timer_id)`` already exists.
"""
if not isinstance(session, AsyncSession):
msg = "broker.publish requires an sqlalchemy.ext.asyncio.AsyncSession"
raise TypeError(msg)
+ if activate_in is not None and activate_at is not None:
+ msg = "broker.publish accepts at most one of activate_in / activate_at"
+ raise ValueError(msg)
payload, hdrs = _encode_payload(body, headers=headers, correlation_id=correlation_id)
- await session.execute(insert(self._outbox_table).values(queue=queue, payload=payload, headers=hdrs))
- await self._notify(session, queue)
+ t = self._outbox_table
+ values: dict[str, typing.Any] = {"queue": queue, "payload": payload, "headers": hdrs}
+ # Server-side compute keeps timing immune to worker/DB clock skew (mirrors
+ # client.mark_pending_with_lease).
+ if activate_in is not None:
+ values["next_attempt_at"] = func.now() + func.make_interval(
+ 0, 0, 0, 0, 0, 0, bindparam("activate_in_seconds", activate_in.total_seconds(), type_=Float)
+ )
+ elif activate_at is not None:
+ values["next_attempt_at"] = activate_at
+ if timer_id is not None:
+ values["timer_id"] = timer_id
+ is_future = activate_in is not None or activate_at is not None
+
+ if timer_id is not None:
+ stmt = (
+ pg_insert(t)
+ .values(**values)
+ .on_conflict_do_nothing(
+ index_elements=[t.c.queue, t.c.timer_id],
+ index_where=t.c.timer_id.is_not(None),
+ )
+ .returning(t.c.id)
+ )
+ else:
+ stmt = insert(t).values(**values).returning(t.c.id)
+
+ result = await session.execute(stmt)
+ row_id: int | None = result.scalar()
+ # Skip NOTIFY for future-dated rows (listeners can't act before the gate
+ # opens — polling fires them at next tick) and on conflict (no row landed).
+ if row_id is not None and not is_future:
+ await self._notify(session, queue)
+ return row_id
async def publish_batch( # ty: ignore[invalid-method-override]
self,
@@ -216,24 +264,70 @@ async def publish_batch( # ty: ignore[invalid-method-override]
queue: str,
session: AsyncSession,
headers: dict[str, str] | None = None,
+ activate_in: _dt.timedelta | None = None,
+ activate_at: _dt.datetime | None = None,
) -> None:
"""
Insert multiple outbox rows via *session*. Same transactional contract as ``publish``.
Each row gets its own auto-generated ``correlation_id``; pass *headers* to
- share static headers across all rows.
+ share static headers across all rows. *activate_in* / *activate_at* schedule
+ every row in the batch identically — per-row timer dedup is not supported,
+ use :meth:`publish` for that.
"""
if not isinstance(session, AsyncSession):
msg = "broker.publish_batch requires an sqlalchemy.ext.asyncio.AsyncSession"
raise TypeError(msg)
+ if activate_in is not None and activate_at is not None:
+ msg = "broker.publish_batch accepts at most one of activate_in / activate_at"
+ raise ValueError(msg)
if not bodies:
return
+ # Client-side time for batch: executemany doesn't compose with column-level
+ # SQL expressions easily, and a few-ms drift versus the DB is harmless for
+ # user-supplied scheduling. (Retries still use server time via mark_pending_with_lease.)
+ next_at: _dt.datetime | None = None
+ if activate_in is not None:
+ next_at = _dt.datetime.now(tz=_dt.UTC) + activate_in
+ elif activate_at is not None:
+ next_at = activate_at
rows = []
for body in bodies:
payload, hdrs = _encode_payload(body, headers=headers)
- rows.append({"queue": queue, "payload": payload, "headers": hdrs})
+ row: dict[str, typing.Any] = {"queue": queue, "payload": payload, "headers": hdrs}
+ if next_at is not None:
+ row["next_attempt_at"] = next_at
+ rows.append(row)
await session.execute(insert(self._outbox_table), rows)
- await self._notify(session, queue)
+ if next_at is None:
+ await self._notify(session, queue)
+
+ async def cancel_timer(
+ self,
+ *,
+ queue: str,
+ timer_id: str,
+ session: AsyncSession,
+ ) -> bool:
+ """
+ Delete a not-yet-leased timer row. Returns True if a row was deleted.
+
+ Same transactional contract as :meth:`publish` — runs on the caller's session
+ and commits with their transaction. The ``acquired_token IS NULL`` guard
+ prevents canceling a row whose handler is already in flight: that returns
+ False and the delivery completes normally.
+ """
+ if not isinstance(session, AsyncSession):
+ msg = "broker.cancel_timer requires an sqlalchemy.ext.asyncio.AsyncSession"
+ raise TypeError(msg)
+ t = self._outbox_table
+ stmt = delete(t).where(
+ t.c.queue == queue,
+ t.c.timer_id == timer_id,
+ t.c.acquired_token.is_(None),
+ )
+ result = await session.execute(stmt)
+ return (result.rowcount or 0) > 0 # ty: ignore[unresolved-attribute]
async def _notify(self, session: AsyncSession, queue: str) -> None:
"""
diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py
index a41e79b..3c258bd 100644
--- a/faststream_outbox/schema.py
+++ b/faststream_outbox/schema.py
@@ -53,6 +53,7 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table
Column("last_attempt_at", DateTime(timezone=True), nullable=True),
Column("acquired_at", DateTime(timezone=True), nullable=True),
Column("acquired_token", Uuid, nullable=True),
+ Column("timer_id", String(255), nullable=True),
)
# Partial index that backs the fetch query's hot branch
# (`WHERE acquired_token IS NULL AND queue = ? AND next_attempt_at <= now()`).
@@ -63,4 +64,14 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table
table.c.next_attempt_at,
postgresql_where=table.c.acquired_token.is_(None),
)
+ # Partial unique index that backs `publish(..., timer_id=...)`'s ON CONFLICT DO NOTHING
+ # and the (queue, timer_id) lookup in `cancel_timer`. Only enforced when timer_id is set,
+ # so non-timer rows remain unconstrained.
+ Index(
+ f"{table_name}_timer_id_uq",
+ table.c.queue,
+ table.c.timer_id,
+ unique=True,
+ postgresql_where=table.c.timer_id.is_not(None),
+ )
return table
diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py
index 68f9f70..e170804 100644
--- a/faststream_outbox/testing.py
+++ b/faststream_outbox/testing.py
@@ -43,6 +43,7 @@ class _FakeRow:
last_attempt_at: _dt.datetime | None = None
acquired_at: _dt.datetime | None = None
acquired_token: uuid.UUID | None = None
+ timer_id: str | None = None
class FakeOutboxClient:
@@ -59,13 +60,19 @@ def feed(
payload: bytes,
headers: dict[str, str] | None = None,
next_attempt_at: _dt.datetime | None = None,
- ) -> int:
+ timer_id: str | None = None,
+ ) -> int | None:
+ # Mirror the real client's partial-unique-on-(queue, timer_id) behavior:
+ # re-feeding a timer that already exists is a no-op.
+ if timer_id is not None and any(r.queue == queue and r.timer_id == timer_id for r in self._rows):
+ return None
row = _FakeRow(
id=self._next_id,
queue=queue,
payload=payload,
headers=headers,
next_attempt_at=next_attempt_at or _utcnow(),
+ timer_id=timer_id,
)
self._rows.append(row)
self._next_id += 1
@@ -153,6 +160,14 @@ async def mark_pending_with_lease( # noqa: PLR0913
return True
return False
+ async def cancel_timer(self, *, queue: str, timer_id: str) -> bool:
+ """Mirror :meth:`OutboxBroker.cancel_timer` — drop a not-yet-leased timer row."""
+ for i, row in enumerate(self._rows):
+ if row.queue == queue and row.timer_id == timer_id and row.acquired_token is None:
+ del self._rows[i]
+ return True
+ return False
+
async def validate_schema(self) -> None:
return
@@ -193,13 +208,15 @@ def feed(
*,
headers: dict[str, str] | None = None,
next_attempt_at: _dt.datetime | None = None,
- ) -> int:
- """Insert a row directly into the in-memory store. Returns the row id."""
+ timer_id: str | None = None,
+ ) -> int | None:
+ """Insert a row directly into the in-memory store. Returns the row id, or None on timer_id conflict."""
return self.fake_client.feed(
queue=queue,
payload=payload,
headers=headers,
next_attempt_at=next_attempt_at,
+ timer_id=timer_id,
)
@contextmanager
diff --git a/tests/test_fake.py b/tests/test_fake.py
index a061820..0787207 100644
--- a/tests/test_fake.py
+++ b/tests/test_fake.py
@@ -591,3 +591,86 @@ async def test_fake_client_fetch_with_conn_mirrors_fetch() -> None:
assert len(rows) == 1
assert rows[0].queue == "q"
assert rows[0].payload == b"x"
+
+
+async def test_fake_client_feed_timer_id_dedup() -> None:
+ fake = FakeOutboxClient()
+ first = fake.feed(queue="q", payload=b"x", timer_id="email-1")
+ second = fake.feed(queue="q", payload=b"y", timer_id="email-1")
+ assert first is not None
+ assert second is None # second feed is a no-op
+ assert len(fake.rows) == 1
+ assert fake.rows[0].payload == b"x" # first wins
+
+
+async def test_fake_client_feed_timer_id_different_queues_allowed() -> None:
+ """Unique-on-(queue, timer_id): same timer_id in a different queue is independent."""
+ fake = FakeOutboxClient()
+ a = fake.feed(queue="q1", payload=b"x", timer_id="email-1")
+ b = fake.feed(queue="q2", payload=b"y", timer_id="email-1")
+ assert a is not None
+ assert b is not None
+ assert len(fake.rows) == 2
+
+
+async def test_fake_client_future_next_attempt_is_invisible_to_fetch() -> None:
+ fake = FakeOutboxClient()
+ future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5)
+ fake.feed(queue="q", payload=b"x", next_attempt_at=future)
+ rows = await fake.fetch(["q"], limit=10, lease_ttl_seconds=60.0)
+ assert rows == []
+
+
+async def test_fake_client_cancel_timer_removes_unleased_row() -> None:
+ fake = FakeOutboxClient()
+ fake.feed(queue="q", payload=b"x", timer_id="email-1")
+ assert await fake.cancel_timer(queue="q", timer_id="email-1") is True
+ assert fake.rows == []
+
+
+async def test_fake_client_cancel_timer_unknown_returns_false() -> None:
+ fake = FakeOutboxClient()
+ assert await fake.cancel_timer(queue="q", timer_id="never-existed") is False
+
+
+async def test_fake_client_cancel_timer_skips_leased_row() -> None:
+ fake = FakeOutboxClient()
+ fake.feed(queue="q", payload=b"x", timer_id="email-1")
+ # Simulate the row having been claimed by a worker.
+ fake.rows[0].acquired_token = uuid.uuid4()
+ fake.rows[0].acquired_at = _dt.datetime.now(tz=_dt.UTC)
+ assert await fake.cancel_timer(queue="q", timer_id="email-1") is False
+ assert len(fake.rows) == 1 # row still there
+
+
+async def test_test_broker_feed_forwards_timer_id() -> None:
+ broker = _make_broker()
+ test_broker = TestOutboxBroker(broker)
+ async with test_broker:
+ first = test_broker.feed("orders", b"x", timer_id="email-1")
+ second = test_broker.feed("orders", b"y", timer_id="email-1")
+ assert first is not None
+ assert second is None
+ assert len(test_broker.fake_client.rows) == 1
+ assert test_broker.fake_client.rows[0].timer_id == "email-1"
+
+
+async def test_fake_broker_delays_delivery_by_next_attempt_at() -> None:
+ """Row fed with next_attempt_at=future should not be dispatched until the gate opens."""
+ broker = _make_broker()
+ received: list[str] = []
+
+ @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
+ async def handle(body: str) -> None:
+ received.append(body)
+
+ test_broker = TestOutboxBroker(broker)
+ async with test_broker:
+ future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300)
+ test_broker.feed("orders", b'"delayed"', next_attempt_at=future, headers={"content-type": "application/json"})
+ # Before the gate opens: nothing delivered.
+ await asyncio.sleep(0.1)
+ assert received == []
+ # After the gate opens (and at least one fetch tick): delivered.
+ await _wait_until(lambda: received, timeout=2.0)
+ assert received == ["delayed"]
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 208a778..e2dbdfd 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -365,6 +365,208 @@ async def handle(body: dict) -> None:
assert len(set(fetch_pids)) == 1, f"persistent connection should hold one pid, saw {set(fetch_pids)}"
+async def test_publish_with_activate_in_delays_delivery(pg_engine, outbox_table) -> None:
+ """Handler must not see the message until activate_in elapses."""
+ received: list[dict] = []
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+
+ @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2)
+ async def handle(body: dict) -> None:
+ received.append(body)
+
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with broker:
+ async with session_factory() as session, session.begin():
+ await broker.publish(
+ {"order_id": 1},
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(milliseconds=500),
+ )
+ # Before the gate opens: nothing delivered.
+ await asyncio.sleep(0.2)
+ assert received == []
+ # After the gate opens: delivered.
+ await _wait_until(lambda: received, timeout=3.0)
+ assert received == [{"order_id": 1}]
+
+
+async def test_publish_with_activate_at_delays_delivery(pg_engine, outbox_table) -> None:
+ received: list[dict] = []
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+
+ @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2)
+ async def handle(body: dict) -> None:
+ received.append(body)
+
+ fire_at = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=500)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with broker:
+ async with session_factory() as session, session.begin():
+ await broker.publish(
+ {"order_id": 2},
+ queue="orders",
+ session=session,
+ activate_at=fire_at,
+ )
+ await asyncio.sleep(0.2)
+ assert received == []
+ await _wait_until(lambda: received, timeout=3.0)
+ assert received == [{"order_id": 2}]
+
+
+async def test_publish_rejects_activate_in_and_at_together(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ with pytest.raises(ValueError, match="activate_in / activate_at"):
+ await broker.publish(
+ {"x": 1},
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(seconds=1),
+ activate_at=_dt.datetime.now(tz=_dt.UTC),
+ )
+
+
+async def test_publish_with_timer_id_dedups(pg_engine, outbox_table) -> None:
+ """Re-publishing the same (queue, timer_id) is a no-op — handler invoked exactly once."""
+ received: list[dict] = []
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+
+ @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2)
+ async def handle(body: dict) -> None:
+ received.append(body)
+
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with broker:
+ async with session_factory() as session, session.begin():
+ first_id = await broker.publish(
+ {"v": 1},
+ queue="orders",
+ session=session,
+ timer_id="email-1",
+ )
+ async with session_factory() as session, session.begin():
+ second_id = await broker.publish(
+ {"v": 2},
+ queue="orders",
+ session=session,
+ timer_id="email-1",
+ )
+ await _wait_until(lambda: received, timeout=3.0)
+
+ assert first_id is not None
+ assert second_id is None # second insert was a no-op
+ assert received == [{"v": 1}] # second body was never delivered
+
+
+async def test_publish_timer_id_distinct_queues_are_independent(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ a = await broker.publish({"x": 1}, queue="q1", session=session, timer_id="dup")
+ b = await broker.publish({"x": 2}, queue="q2", session=session, timer_id="dup")
+ assert a is not None
+ assert b is not None
+ assert await _row_count(pg_engine, outbox_table) == 2
+
+
+async def test_cancel_timer_before_fire_prevents_delivery(pg_engine, outbox_table) -> None:
+ received: list[dict] = []
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+
+ @broker.subscriber("orders", min_fetch_interval=0.05, max_fetch_interval=0.2)
+ async def handle(body: dict) -> None:
+ received.append(body) # pragma: no cover # cancellation must prevent this
+
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with broker:
+ async with session_factory() as session, session.begin():
+ await broker.publish(
+ {"order_id": 9},
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(seconds=1),
+ timer_id="email-cancel",
+ )
+ # Cancel before activate_in elapses.
+ async with session_factory() as session, session.begin():
+ cancelled = await broker.cancel_timer(queue="orders", timer_id="email-cancel", session=session)
+ assert cancelled is True
+ # Wait past the original fire time — handler must never see the row.
+ await asyncio.sleep(1.5)
+ assert received == []
+ assert await _row_count(pg_engine, outbox_table) == 0
+
+
+async def test_cancel_timer_after_lease_taken_returns_false(pg_engine, outbox_table) -> None:
+ """If the row's already in flight, cancel is a no-op and delivery completes."""
+ received: list[dict] = []
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ handler_started = asyncio.Event()
+ release_handler = asyncio.Event()
+
+ @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
+ async def handle(body: dict) -> None:
+ handler_started.set()
+ await release_handler.wait()
+ received.append(body)
+
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with broker:
+ async with session_factory() as session, session.begin():
+ await broker.publish(
+ {"v": "in-flight"},
+ queue="orders",
+ session=session,
+ timer_id="cant-cancel",
+ )
+ # Wait for the handler to enter; the row is now leased.
+ await asyncio.wait_for(handler_started.wait(), timeout=3.0)
+ async with session_factory() as session, session.begin():
+ cancelled = await broker.cancel_timer(queue="orders", timer_id="cant-cancel", session=session)
+ assert cancelled is False # lease guard prevented the DELETE
+ release_handler.set()
+ await _wait_until(lambda: received, timeout=3.0)
+ assert received == [{"v": "in-flight"}]
+
+
+async def test_cancel_timer_unknown_returns_false(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ cancelled = await broker.cancel_timer(queue="orders", timer_id="nope", session=session)
+ assert cancelled is False
+
+
+async def test_publish_returns_inserted_row_id(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ row_id = await broker.publish({"x": 1}, queue="orders", session=session)
+ assert isinstance(row_id, int)
+ assert row_id > 0
+
+
+async def test_publish_batch_with_activate_in_delays_all_rows(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ await broker.publish_batch(
+ {"i": 1},
+ {"i": 2},
+ {"i": 3},
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(minutes=10), # well past any test horizon
+ )
+ # Rows are inserted but invisible to fetch (next_attempt_at in the future).
+ assert await _row_count(pg_engine, outbox_table) == 3
+ client = OutboxClient(pg_engine, outbox_table)
+ assert await client.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) == []
+
+
async def test_notify_payload_carries_queue_name(pg_engine, outbox_table) -> None:
"""The NOTIFY payload Postgres delivers to LISTEN clients must equal the queue name."""
received_payloads: list[str] = []
diff --git a/tests/test_unit.py b/tests/test_unit.py
index 19b390e..f45b56a 100644
--- a/tests/test_unit.py
+++ b/tests/test_unit.py
@@ -4,6 +4,8 @@
import pytest
from sqlalchemy import MetaData
+from sqlalchemy.dialects import postgresql
+from sqlalchemy.ext.asyncio import AsyncSession
from faststream_outbox import (
ConstantRetry,
@@ -29,6 +31,20 @@ def _make_broker(engine: object | None = None, table_name: str = "outbox") -> Ou
return OutboxBroker(outbox_table=table)
+def _make_session_mock(*, scalar_return: object = 42) -> AsyncMock:
+ """
+ Build an AsyncSession mock whose ``execute()`` returns a sync MagicMock.
+
+ AsyncMock(spec=AsyncSession) makes the return_value of execute() default to an
+ AsyncMock — so ``result.scalar()`` would itself return a coroutine. The broker's
+ real CursorResult.scalar() is sync, so override the return_value to a MagicMock.
+ """
+ session = AsyncMock(spec=AsyncSession)
+ session.execute.return_value = MagicMock()
+ session.execute.return_value.scalar.return_value = scalar_return
+ return session
+
+
# --- make_outbox_table ---
@@ -48,11 +64,22 @@ def test_make_outbox_table_columns_present() -> None:
"last_attempt_at",
"acquired_at",
"acquired_token",
+ "timer_id",
}
assert {c.name for c in t.columns} == expected
assert t.name == "my_outbox"
+def test_make_outbox_table_declares_timer_unique_index() -> None:
+ metadata = MetaData()
+ t = make_outbox_table(metadata, table_name="my_outbox")
+ timer_idx = next(idx for idx in t.indexes if idx.name == "my_outbox_timer_id_uq")
+ assert timer_idx.unique is True
+ assert [c.name for c in timer_idx.columns] == ["queue", "timer_id"]
+ # Partial-index predicate ensures non-timer rows aren't constrained
+ assert timer_idx.dialect_options["postgresql"]["where"] is not None
+
+
def test_make_outbox_table_attaches_to_metadata() -> None:
metadata = MetaData()
t = make_outbox_table(metadata, table_name="outbox")
@@ -276,10 +303,8 @@ async def test_broker_publish_rejects_non_async_session() -> None:
async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> None:
- from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415
-
broker = _make_broker()
- session = AsyncMock(spec=AsyncSession)
+ session = _make_session_mock()
await broker.publish({"order_id": 1}, queue="orders", session=session)
# Two execute calls: the INSERT, then SELECT pg_notify(...).
assert session.execute.await_count == 2
@@ -296,10 +321,8 @@ async def test_broker_publish_executes_insert_then_pg_notify_on_session() -> Non
async def test_broker_publish_does_not_commit() -> None:
- from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415
-
broker = _make_broker()
- session = AsyncMock(spec=AsyncSession)
+ session = _make_session_mock()
await broker.publish(b"x", queue="orders", session=session)
session.commit.assert_not_called()
session.flush.assert_not_called()
@@ -373,6 +396,145 @@ async def test_broker_publish_batch_no_bodies_is_noop() -> None:
session.execute.assert_not_called()
+async def test_broker_publish_rejects_activate_in_and_at_together() -> None:
+ broker = _make_broker()
+ session = _make_session_mock()
+ with pytest.raises(ValueError, match="activate_in / activate_at"):
+ await broker.publish(
+ b"x",
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(seconds=1),
+ activate_at=_dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=1),
+ )
+
+
+async def test_broker_publish_with_activate_in_skips_notify() -> None:
+ broker = _make_broker()
+ session = _make_session_mock()
+ await broker.publish(b"x", queue="orders", session=session, activate_in=_dt.timedelta(seconds=30))
+ # Only the INSERT — no NOTIFY for future-dated rows.
+ assert session.execute.await_count == 1
+ insert_stmt = session.execute.await_args_list[0].args[0]
+ assert "INSERT INTO" in str(insert_stmt)
+ assert "next_attempt_at" in str(insert_stmt)
+
+
+async def test_broker_publish_with_activate_at_skips_notify() -> None:
+ broker = _make_broker()
+ session = _make_session_mock()
+ fire = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5)
+ await broker.publish(b"x", queue="orders", session=session, activate_at=fire)
+ assert session.execute.await_count == 1
+ params = session.execute.await_args_list[0].args[0].compile().params
+ assert params["next_attempt_at"] == fire
+
+
+async def test_broker_publish_with_timer_id_uses_on_conflict() -> None:
+ broker = _make_broker()
+ session = _make_session_mock()
+ await broker.publish(b"x", queue="orders", session=session, timer_id="email-123")
+ insert_stmt = session.execute.await_args_list[0].args[0]
+ compiled = insert_stmt.compile(dialect=postgresql.dialect())
+ sql = str(compiled)
+ assert "INSERT INTO" in sql
+ assert "ON CONFLICT" in sql
+ assert "DO NOTHING" in sql
+ assert compiled.params["timer_id"] == "email-123"
+
+
+async def test_broker_publish_returns_none_on_timer_id_conflict() -> None:
+ broker = _make_broker()
+ # Simulate ON CONFLICT DO NOTHING returning no rows: scalar() → None
+ session = _make_session_mock(scalar_return=None)
+ result = await broker.publish(b"x", queue="orders", session=session, timer_id="dup")
+ assert result is None
+ # NOTIFY skipped when nothing was inserted.
+ assert session.execute.await_count == 1
+
+
+async def test_broker_publish_batch_rejects_activate_in_and_at_together() -> None:
+ broker = _make_broker()
+ session = AsyncMock(spec=AsyncSession)
+ with pytest.raises(ValueError, match="activate_in / activate_at"):
+ await broker.publish_batch(
+ b"a",
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(seconds=1),
+ activate_at=_dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=1),
+ )
+
+
+async def test_broker_publish_batch_with_activate_in_skips_notify() -> None:
+ broker = _make_broker()
+ session = AsyncMock(spec=AsyncSession)
+ await broker.publish_batch(
+ b"a",
+ b"b",
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(seconds=30),
+ )
+ # Insert only — no NOTIFY for future-dated batch.
+ assert session.execute.await_count == 1
+ rows = session.execute.await_args_list[0].args[1]
+ assert all("next_attempt_at" in r for r in rows)
+
+
+async def test_broker_publish_batch_with_activate_at_skips_notify() -> None:
+ broker = _make_broker()
+ session = AsyncMock(spec=AsyncSession)
+ fire = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5)
+ await broker.publish_batch(b"a", b"b", queue="orders", session=session, activate_at=fire)
+ # No NOTIFY: future-dated rows.
+ assert session.execute.await_count == 1
+ rows = session.execute.await_args_list[0].args[1]
+ assert all(r["next_attempt_at"] == fire for r in rows)
+
+
+async def test_broker_publish_batch_does_not_accept_timer_id() -> None:
+ """publish_batch must not expose per-row dedup; timer_id is a publish()-only kwarg."""
+ broker = _make_broker()
+ session = AsyncMock(spec=AsyncSession)
+ with pytest.raises(TypeError, match="timer_id"):
+ await broker.publish_batch(
+ b"a",
+ queue="orders",
+ session=session,
+ timer_id="x", # ty: ignore[unknown-argument]
+ )
+
+
+async def test_broker_cancel_timer_rejects_non_async_session() -> None:
+ broker = _make_broker()
+ with pytest.raises(TypeError, match="AsyncSession"):
+ await broker.cancel_timer(queue="orders", timer_id="x", session=object()) # ty: ignore[invalid-argument-type]
+
+
+async def test_broker_cancel_timer_emits_delete_with_lease_guard() -> None:
+ broker = _make_broker()
+ session = AsyncMock(spec=AsyncSession)
+ session.execute.return_value.rowcount = 1
+ deleted = await broker.cancel_timer(queue="orders", timer_id="email-1", session=session)
+ assert deleted is True
+ delete_stmt = session.execute.await_args_list[0].args[0]
+ sql = str(delete_stmt)
+ assert "DELETE" in sql
+ assert "acquired_token IS NULL" in sql
+ params = delete_stmt.compile().params
+ assert params["queue_1"] == "orders"
+ assert params["timer_id_1"] == "email-1"
+
+
+async def test_broker_cancel_timer_returns_false_when_nothing_deleted() -> None:
+ broker = _make_broker()
+ session = AsyncMock(spec=AsyncSession)
+ session.execute.return_value.rowcount = 0
+ deleted = await broker.cancel_timer(queue="orders", timer_id="x", session=session)
+ assert deleted is False
+
+
async def test_broker_publish_batch_executes_single_insert_for_many_rows() -> None:
from sqlalchemy.ext.asyncio import AsyncSession # noqa: PLC0415