Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ There is **no `state` column**: a row is "available" iff `acquired_token IS NULL

Atomicity: `delete_with_lease` switches to a single CTE `WITH deleted AS (DELETE … RETURNING …) INSERT INTO <dlq> SELECT …` — preserves writer-connection autocommit + lease-token guard. INSERT failure rolls back DELETE, so DLQ misconfiguration surfaces as outbox growth + `lease_lost` spikes, not silent audit loss.

**DLQ projection.** The outbox→DLQ column mapping is `_DLQ_PROJECTION` (+ `_DLQ_INJECTED_COLUMNS`) in `schema.py` — one declarative `(outbox_col, dlq_col)` list that the real CTE (`OutboxClient._build_dlq_cte_stmt`) and the fake (`FakeOutboxClient.delete_with_lease`) both build their column lists from. Adding a DLQ column is one edit there, not hand-kept parity across SQL and Python.

`OutboxInnerMessage.terminal_failure_reason` is set on three paths: `allow_delivery` False → `"max_deliveries"`, `_nack` exhausted → `"retry_terminal"`, `_reject` → `"rejected"`. **Branch on `terminal_failure_reason` BEFORE `last_exception`** in `dispatch_one` so manual `await msg.reject()` (no exception raised) routes to `nacked_terminal(reason="rejected")`, not `acked`.

**The `DLQFailureReason` `Literal` (`message.py`) is the public contract** — operator queries / dashboards key off these values; changes are API-breaking.
Expand All @@ -107,7 +109,7 @@ Per subscriber:

**Connection budget.** Each subscriber holds `max_workers + 1` SQLAlchemy pool connections steady-state + one raw asyncpg connection for LISTEN. Size the pool for `Σ subscribers × (max_workers + 1)` or startup blocks on checkout — the asyncpg LISTEN connection lives **outside** the pool, so it does not count toward pool sizing. **Per process** — Postgres `max_connections` must cover `replicas × Σ subscribers × (max_workers + 2)`: the `max_workers + 1` pool connections **plus** the out-of-pool asyncpg LISTEN connection. Undersize it and rolling deploys hit `FATAL: too many connections`.

**NOTIFY semantics.** `broker.publish` / `publish_batch` emit `SELECT pg_notify('outbox_<table>', queue)` on the caller's session right after the INSERT, **except** when future-dated or `timer_id` conflict no-op'd the insert. NOTIFY is transactional — atomicity with the row is automatic; rolled-back transactions silently drop it. The future-dated decision is one shared `_is_future_dated(activate_in, activate_at, now)`; `activate_at`'s comparison and the `publish_batch` `next_attempt_at` are **worker-clock-relative** (unlike `activate_in`'s server-side `make_interval`), so under worker/DB clock skew NOTIFY may fire slightly early/late — polling backstops it (F2-04/F2-05). NOTIFYs emitted **during a fetch-loop reconnect/backoff window are lost** (LISTEN is not durable); latency degrades to the poll interval until the next tick — a latency, not a correctness, gap (F1-07). Channel naming is `outbox_<table_name>`. Postgres limits identifiers to 63 bytes; `make_outbox_table` **raises `ValueError`** when the longest derived identifier — an index name like `<table>_pending_idx`, longer than the NOTIFY channel itself — would exceed it — so over-long table names (~>51 bytes) are rejected at construction, not silently degraded to polling.
**NOTIFY semantics.** `broker.publish` / `publish_batch` emit `SELECT pg_notify('outbox_<table>', queue)` on the caller's session right after the INSERT, **except** when future-dated or `timer_id` conflict no-op'd the insert. NOTIFY is transactional — atomicity with the row is automatic; rolled-back transactions silently drop it. The future-dated decision is one shared `is_future_dated(activate_in, activate_at, now)` (in the stdlib-only leaf `_scheduling.py`, alongside `resolve_next_attempt_client_side` and `validate_activate_args` — the pure activate-args helpers the real and fake publish paths share); `activate_at`'s comparison and the `publish_batch` `next_attempt_at` are **worker-clock-relative** (unlike `activate_in`'s server-side `make_interval`), so under worker/DB clock skew NOTIFY may fire slightly early/late — polling backstops it (F2-04/F2-05). NOTIFYs emitted **during a fetch-loop reconnect/backoff window are lost** (LISTEN is not durable); latency degrades to the poll interval until the next tick — a latency, not a correctness, gap (F1-07). Channel naming is `outbox_<table_name>`. Postgres limits identifiers to 63 bytes; `make_outbox_table` **raises `ValueError`** when the longest derived identifier — an index name like `<table>_pending_idx`, longer than the NOTIFY channel itself — would exceed it — so over-long table names (~>51 bytes) are rejected at construction, not silently degraded to polling.

### Lease-token invariant — load-bearing

Expand Down Expand Up @@ -146,6 +148,8 @@ Deep dive: `architecture/drain.md`.

`FakeOutboxClient.validate_schema()` raises `NotImplementedError` — a silent pass would let users ship broken schemas while tests stay green. Use a real `OutboxClient(real_engine, table)` for schema validation tests.

**Client contract.** `OutboxClient` (SQL) and `FakeOutboxClient` (Python) implement the same rules in different substrates — they can't share code, so `tests/test_client_contract.py` pins them to one behavioural contract: a single parametrized scenario module run against **both** adapters (fake everywhere; real Postgres auto-skipped when unreachable) over the shared `AbstractOutboxClient` surface (`fetch` / `delete_with_lease` / `mark_pending_with_lease` + DLQ). Drift fails a test instead of shipping. It pins *structural* drift only — an in-process test can't manufacture cross-host DB-vs-worker clock skew, so the real client's server-side `make_interval` clock authority stays a documented invariant, not an assertion. `cancel_timer` and `timer_id` insert-dedup are broker/producer concerns, not on the client interface, so they live in `test_integration.py` / `test_fake.py`.

**Session leniency.** The fake `publish` / `publish_batch` / `cancel_timer` / `fetch_unprocessed` all `del session` — any value (incl. `None`) is accepted, diverging from production's `isinstance(session, AsyncSession)` `TypeError` (F4-09). Tests that need to assert the session contract must use the real `OutboxClient` / a real `AsyncSession`, not the fake. `OutboxResponse` is **not** faked, so its eager session/queue/activate validation does fire under the test broker.

**Gotcha:** subscribers registered via `OutboxRouter` (then `broker.include_router(router)`) live on the router, not `broker._subscribers`. Walk `broker.subscribers` (the property) for full introspection.
Expand Down
2 changes: 2 additions & 0 deletions architecture/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ User-facing: `docs/usage/dlq.md`. Invariant summary: `CLAUDE.md` § Opt-in DLQ.

`OutboxClient.delete_with_lease` switches to `WITH deleted AS (DELETE … RETURNING …) INSERT INTO <dlq> SELECT … FROM deleted` when configured. One statement preserves the writer-connection autocommit fast path and the lease-token guard; INSERT failure rolls back the DELETE, so the outbox row stays leased and is reclaimed when the lease expires — **DLQ misconfiguration surfaces as outbox-table growth + `lease_lost` spikes rather than silent audit loss.** Identifiers are quoted via the dialect's `identifier_preparer`; values flow through bind params.

The CTE's `RETURNING` / `INSERT` / `SELECT` column lists are not hand-written — they derive from `_DLQ_PROJECTION` (the `(outbox_col, dlq_col)` pairs copied verbatim) plus `_DLQ_INJECTED_COLUMNS` (`failure_reason`, `last_exception`, supplied by the caller) in `schema.py`. The fake (`FakeOutboxClient.delete_with_lease`) builds its audit dict from the same constants, so the two substrates can't drift on which columns the archive carries — a DLQ column change is one edit in `schema.py`, verified across both adapters by `tests/test_client_contract.py`. `failed_at` is not in the projection; it rides the DLQ column's `server_default`.

## `terminal_failure_reason` routing

`OutboxInnerMessage.terminal_failure_reason` is set on the three failure paths:
Expand Down
6 changes: 6 additions & 0 deletions architecture/test-broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ Spins up the real `_fetch_loop` / `_worker_loop` against the fake client. Requir

`FakeOutboxClient.validate_schema()` raises `NotImplementedError` — there is no real DB to validate against, and a silent pass would let users ship broken schemas while their `TestOutboxBroker`-backed tests stay green. Tests that need real schema validation must construct an `OutboxClient(real_engine, table)` against the same DSN the migrations ran against.

## The client contract keeps the fake honest

`FakeOutboxClient` re-implements the outbox rules in Python because there is no in-process Postgres — eligibility, lease cutoff, retry timing, the NULL-token guard, and the DLQ projection all exist twice (SQL in `OutboxClient`, Python here). The two can't share an implementation (one runs in the database, one in the process), so `tests/test_client_contract.py` couples them by **behaviour** instead: one parametrized scenario module asserts the shared `AbstractOutboxClient` surface (`fetch` / `delete_with_lease` / `mark_pending_with_lease` + DLQ) against *both* adapters — the fake everywhere, real Postgres auto-skipped when unreachable. A per-adapter harness hides substrate differences (how a row is seeded, which connection a terminal write needs); scheduling is seeded as server-side `make_interval` offsets so the comparison is clock-skew-free; expectations are hand-specified so neither adapter passes trivially against itself.

What it pins is *structural* drift (eligibility states, FIFO selection under contention, the token guard, the DLQ projection). What it deliberately cannot pin: cross-host DB-vs-worker clock skew — an in-process test can't manufacture it — so the real client's server-side clock authority stays a documented invariant. `cancel_timer` and `timer_id` insert-dedup are broker/producer concerns (not on the client interface), so they remain covered in `test_integration.py` / `test_fake.py`, not the contract suite. The pure helpers that *can* be shared — the DLQ projection (`_DLQ_PROJECTION` in `schema.py`) and the activate-args resolution (`_scheduling.py`) — are extracted so the fake consumes the same code as production rather than a parallel copy.

## `_fake_start` skips the parent publisher-iteration loop

`TestOutboxBroker._fake_start` deliberately **skips the parent's publisher-iteration loop** (the one that calls `create_publisher_fake_subscriber`). Reason: FastStream's publisher-spy infrastructure mocks the registered handler to forward `publisher.publish()` calls — which conflicts with the outbox's real dispatch path (the fake producer already lands rows in the fake client *and* drives the real handler via `_sync_dispatch`).
Expand Down
53 changes: 53 additions & 0 deletions faststream_outbox/_scheduling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Pure activate-args resolution + validation, shared by the real and fake publish paths.

``activate_in`` / ``activate_at`` are the user-facing scheduling knobs. Turning them
into a single ``next_attempt_at`` (client clock) and deciding whether a row is
future-dated (so NOTIFY can be skipped) are the same rules everywhere, so they live
here as pure functions a caller invokes with its own ``now``.

Leaf module: depends only on the stdlib so ``producer``, ``broker`` and ``testing``
can all import *from* it without a cycle. The single-publish real path computes
``next_attempt_at`` server-side via ``make_interval`` (clock-skew-safe) and does not
use ``resolve_next_attempt_client_side`` — only the batch and fake paths do.
"""

import datetime as _dt


def is_future_dated(
activate_in: _dt.timedelta | None,
activate_at: _dt.datetime | None,
now: _dt.datetime,
) -> bool:
"""Whether a row is genuinely future-dated (so NOTIFY is skipped — polling fires it at the gate)."""
if activate_in is not None:
return activate_in > _dt.timedelta(0)
if activate_at is not None:
return activate_at > now
return False


def resolve_next_attempt_client_side(
activate_in: _dt.timedelta | None,
activate_at: _dt.datetime | None,
now: _dt.datetime,
) -> _dt.datetime | None:
"""Resolve activate_in / activate_at to a single ``next_attempt_at`` value (client clock)."""
if activate_in is not None:
return now + activate_in
return activate_at


def validate_activate_args(
method_name: str,
activate_in: _dt.timedelta | None,
activate_at: _dt.datetime | None,
) -> None:
"""Mutex + tz-aware checks shared by the test fakes. Real broker delegates to ``OutboxPublishCommand``."""
if activate_in is not None and activate_at is not None:
msg = f"{method_name} accepts at most one of activate_in / activate_at"
raise ValueError(msg)
if activate_at is not None and activate_at.tzinfo is None:
msg = f"{method_name} requires activate_at to be timezone-aware"
raise ValueError(msg)
25 changes: 0 additions & 25 deletions faststream_outbox/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession

from faststream_outbox._time import utcnow
from faststream_outbox.client import AbstractOutboxClient, OutboxClient, _row_to_message
from faststream_outbox.configs import LastExceptionRenderer, OutboxBrokerConfig
from faststream_outbox.message import OutboxInnerMessage
Expand All @@ -56,30 +55,6 @@
from faststream_outbox.subscriber.usecase import OutboxSubscriber


def _validate_activate_args(
method_name: str,
activate_in: _dt.timedelta | None,
activate_at: _dt.datetime | None,
) -> None:
"""Mutex + tz-aware checks shared by the test fakes. Real broker delegates to ``OutboxPublishCommand``."""
if activate_in is not None and activate_at is not None:
msg = f"{method_name} accepts at most one of activate_in / activate_at"
raise ValueError(msg)
if activate_at is not None and activate_at.tzinfo is None:
msg = f"{method_name} requires activate_at to be timezone-aware"
raise ValueError(msg)


def _compute_next_at_client_side(
activate_in: _dt.timedelta | None,
activate_at: _dt.datetime | None,
) -> _dt.datetime | None:
"""Resolve activate_in / activate_at to a single ``next_attempt_at`` value (client clock)."""
if activate_in is not None:
return utcnow() + activate_in
return activate_at


def _spec_url(engine: "AsyncEngine | None", outbox_table: "Table") -> list[str]:
"""
AsyncAPI server URL(s) for the broker spec.
Expand Down
28 changes: 18 additions & 10 deletions faststream_outbox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from faststream_outbox._import_checker import is_alembic_installed
from faststream_outbox.message import OutboxInnerMessage
from faststream_outbox.schema import (
_DLQ_INJECTED_COLUMNS,
_DLQ_PROJECTION,
_LEASE_CK_SUFFIX,
_LEASE_IDX_SUFFIX,
_PENDING_IDX_SUFFIX,
Expand Down Expand Up @@ -329,21 +331,27 @@ def _build_dlq_cte_stmt(
# wrote to a same-named search_path table (B10).
outbox_name = preparer.format_table(self._table)
dlq_name = preparer.format_table(dlq_table)
# S608: outbox_name / dlq_name come from application-defined SQLAlchemy
# Table objects (not request input) and are quoted via the dialect's
# identifier preparer — values flow through :bindparam placeholders.
# Column lists derive from _DLQ_PROJECTION (projected pairs first, then the
# injected failure-context columns) so the real CTE and the fake stay in lockstep
# off one source. INSERT and SELECT share the same order, so the named columns map
# positionally.
returning_cols = ", ".join(out for out, _ in _DLQ_PROJECTION)
insert_cols = ", ".join([dlq for _, dlq in _DLQ_PROJECTION] + list(_DLQ_INJECTED_COLUMNS))
select_exprs = ", ".join(
[out for out, _ in _DLQ_PROJECTION] + [f":{col}" for col in _DLQ_INJECTED_COLUMNS],
)
# S608: outbox_name / dlq_name come from application-defined SQLAlchemy Table
# objects (not request input) and are quoted via the dialect's identifier preparer;
# the column names come from _DLQ_PROJECTION constants. Values flow through
# :bindparam placeholders.
cte_sql = (
f"WITH deleted AS (" # noqa: S608
f"DELETE FROM {outbox_name} "
f"WHERE id = :message_id AND acquired_token = :acquired_token "
f"RETURNING id, queue, payload, headers, deliveries_count, created_at, timer_id"
f") "
f"INSERT INTO {dlq_name} ("
f"original_id, queue, payload, headers, deliveries_count, created_at, "
f"failure_reason, last_exception, timer_id"
f"RETURNING {returning_cols}"
f") "
f"SELECT id, queue, payload, headers, deliveries_count, created_at, "
f":failure_reason, :last_exception, timer_id "
f"INSERT INTO {dlq_name} ({insert_cols}) "
f"SELECT {select_exprs} "
f"FROM deleted"
)
sql = text(cte_sql)
Expand Down
24 changes: 4 additions & 20 deletions faststream_outbox/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
``AsyncSession`` so the row commits atomically with the caller's domain writes.
"""

import datetime as _dt
import time
import typing

Expand All @@ -17,6 +16,7 @@
from sqlalchemy import Float, Table, bindparam, func, insert, text
from sqlalchemy.dialects.postgresql import insert as pg_insert

from faststream_outbox._scheduling import is_future_dated, resolve_next_attempt_client_side
from faststream_outbox._time import utcnow
from faststream_outbox.envelope import _encode_payload
from faststream_outbox.metrics import MetricsRecorder, _noop_recorder, _safe_emit
Expand All @@ -32,19 +32,6 @@
from faststream._internal.types import AsyncCallable, CustomCallable


def _is_future_dated(
activate_in: _dt.timedelta | None,
activate_at: _dt.datetime | None,
now: _dt.datetime,
) -> bool:
"""Whether a row is genuinely future-dated (so NOTIFY is skipped — polling fires it at the gate)."""
if activate_in is not None:
return activate_in > _dt.timedelta(0)
if activate_at is not None:
return activate_at > now
return False


class OutboxProducer:
"""``ProducerProto[OutboxPublishCommand]`` — runs encode + insert + NOTIFY on caller's session."""

Expand Down Expand Up @@ -149,7 +136,7 @@ async def _do_publish(
# Skip NOTIFY only when the row is genuinely future-dated. A past activate_at
# (e.g. a recovered idempotency token) is immediately eligible — fire NOTIFY.
now = utcnow()
is_future = _is_future_dated(cmd.activate_in, cmd.activate_at, now)
is_future = is_future_dated(cmd.activate_in, cmd.activate_at, now)

if cmd.timer_id is not None:
stmt = (
Expand Down Expand Up @@ -178,10 +165,7 @@ async def publish_batch(self, cmd: OutboxPublishCommand) -> None:
# column-level SQL expressions, and a few-ms drift versus the DB is
# harmless for user-supplied scheduling. Retries still use server time.
now = utcnow()
if cmd.activate_in is not None:
next_at: _dt.datetime | None = now + cmd.activate_in
else:
next_at = cmd.activate_at
next_at = resolve_next_attempt_client_side(cmd.activate_in, cmd.activate_at, now)
rows: list[dict[str, typing.Any]] = []
total_size = 0
start_perf = time.perf_counter()
Expand All @@ -197,7 +181,7 @@ async def publish_batch(self, cmd: OutboxPublishCommand) -> None:
rows.append(row)
await cmd.session.execute(insert(self._table), rows)
# Skip NOTIFY only when genuinely future-dated; past times are eligible.
if not _is_future_dated(cmd.activate_in, cmd.activate_at, now):
if not is_future_dated(cmd.activate_in, cmd.activate_at, now):
await self._notify(cmd.session, cmd.queue)
except Exception as exc:
self._emit_metric(
Expand Down
Loading