From 79363e65419d729e6e1c93a27f63b25949a4f9d9 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 16:22:24 +0300 Subject: [PATCH 1/6] refactor(scheduling): consolidate activate-args helpers into _scheduling.py Move is_future_dated, resolve_next_attempt_client_side and validate_activate_args into a new stdlib-only leaf module so the real and fake publish paths share one copy. publish_batch now calls the resolver instead of inlining it. Co-Authored-By: Claude Opus 4.8 (1M context) --- faststream_outbox/_scheduling.py | 53 +++++++++++++++++++++++++ faststream_outbox/broker.py | 25 ------------ faststream_outbox/publisher/producer.py | 24 ++--------- faststream_outbox/testing.py | 23 +++++------ 4 files changed, 67 insertions(+), 58 deletions(-) create mode 100644 faststream_outbox/_scheduling.py diff --git a/faststream_outbox/_scheduling.py b/faststream_outbox/_scheduling.py new file mode 100644 index 0000000..9aebbd1 --- /dev/null +++ b/faststream_outbox/_scheduling.py @@ -0,0 +1,53 @@ +""" +Pure activate-args resolution + validation, shared by the real and fake publish paths. + +``activate_in`` / ``activate_at`` are the user-facing scheduling knobs. Turning them +into a single ``next_attempt_at`` (client clock) and deciding whether a row is +future-dated (so NOTIFY can be skipped) are the same rules everywhere, so they live +here as pure functions a caller invokes with its own ``now``. + +Leaf module: depends only on the stdlib so ``producer``, ``broker`` and ``testing`` +can all import *from* it without a cycle. The single-publish real path computes +``next_attempt_at`` server-side via ``make_interval`` (clock-skew-safe) and does not +use ``resolve_next_attempt_client_side`` — only the batch and fake paths do. +""" + +import datetime as _dt + + +def is_future_dated( + activate_in: _dt.timedelta | None, + activate_at: _dt.datetime | None, + now: _dt.datetime, +) -> bool: + """Whether a row is genuinely future-dated (so NOTIFY is skipped — polling fires it at the gate).""" + if activate_in is not None: + return activate_in > _dt.timedelta(0) + if activate_at is not None: + return activate_at > now + return False + + +def resolve_next_attempt_client_side( + activate_in: _dt.timedelta | None, + activate_at: _dt.datetime | None, + now: _dt.datetime, +) -> _dt.datetime | None: + """Resolve activate_in / activate_at to a single ``next_attempt_at`` value (client clock).""" + if activate_in is not None: + return now + activate_in + return activate_at + + +def validate_activate_args( + method_name: str, + activate_in: _dt.timedelta | None, + activate_at: _dt.datetime | None, +) -> None: + """Mutex + tz-aware checks shared by the test fakes. Real broker delegates to ``OutboxPublishCommand``.""" + if activate_in is not None and activate_at is not None: + msg = f"{method_name} accepts at most one of activate_in / activate_at" + raise ValueError(msg) + if activate_at is not None and activate_at.tzinfo is None: + msg = f"{method_name} requires activate_at to be timezone-aware" + raise ValueError(msg) diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index a1056ed..264cd04 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -30,7 +30,6 @@ from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession -from faststream_outbox._time import utcnow from faststream_outbox.client import AbstractOutboxClient, OutboxClient, _row_to_message from faststream_outbox.configs import LastExceptionRenderer, OutboxBrokerConfig from faststream_outbox.message import OutboxInnerMessage @@ -56,30 +55,6 @@ from faststream_outbox.subscriber.usecase import OutboxSubscriber -def _validate_activate_args( - method_name: str, - activate_in: _dt.timedelta | None, - activate_at: _dt.datetime | None, -) -> None: - """Mutex + tz-aware checks shared by the test fakes. Real broker delegates to ``OutboxPublishCommand``.""" - if activate_in is not None and activate_at is not None: - msg = f"{method_name} accepts at most one of activate_in / activate_at" - raise ValueError(msg) - if activate_at is not None and activate_at.tzinfo is None: - msg = f"{method_name} requires activate_at to be timezone-aware" - raise ValueError(msg) - - -def _compute_next_at_client_side( - activate_in: _dt.timedelta | None, - activate_at: _dt.datetime | None, -) -> _dt.datetime | None: - """Resolve activate_in / activate_at to a single ``next_attempt_at`` value (client clock).""" - if activate_in is not None: - return utcnow() + activate_in - return activate_at - - def _spec_url(engine: "AsyncEngine | None", outbox_table: "Table") -> list[str]: """ AsyncAPI server URL(s) for the broker spec. diff --git a/faststream_outbox/publisher/producer.py b/faststream_outbox/publisher/producer.py index b54ad1b..eb51888 100644 --- a/faststream_outbox/publisher/producer.py +++ b/faststream_outbox/publisher/producer.py @@ -8,7 +8,6 @@ ``AsyncSession`` so the row commits atomically with the caller's domain writes. """ -import datetime as _dt import time import typing @@ -17,6 +16,7 @@ from sqlalchemy import Float, Table, bindparam, func, insert, text from sqlalchemy.dialects.postgresql import insert as pg_insert +from faststream_outbox._scheduling import is_future_dated, resolve_next_attempt_client_side from faststream_outbox._time import utcnow from faststream_outbox.envelope import _encode_payload from faststream_outbox.metrics import MetricsRecorder, _noop_recorder, _safe_emit @@ -32,19 +32,6 @@ from faststream._internal.types import AsyncCallable, CustomCallable -def _is_future_dated( - activate_in: _dt.timedelta | None, - activate_at: _dt.datetime | None, - now: _dt.datetime, -) -> bool: - """Whether a row is genuinely future-dated (so NOTIFY is skipped — polling fires it at the gate).""" - if activate_in is not None: - return activate_in > _dt.timedelta(0) - if activate_at is not None: - return activate_at > now - return False - - class OutboxProducer: """``ProducerProto[OutboxPublishCommand]`` — runs encode + insert + NOTIFY on caller's session.""" @@ -149,7 +136,7 @@ async def _do_publish( # Skip NOTIFY only when the row is genuinely future-dated. A past activate_at # (e.g. a recovered idempotency token) is immediately eligible — fire NOTIFY. now = utcnow() - is_future = _is_future_dated(cmd.activate_in, cmd.activate_at, now) + is_future = is_future_dated(cmd.activate_in, cmd.activate_at, now) if cmd.timer_id is not None: stmt = ( @@ -178,10 +165,7 @@ async def publish_batch(self, cmd: OutboxPublishCommand) -> None: # column-level SQL expressions, and a few-ms drift versus the DB is # harmless for user-supplied scheduling. Retries still use server time. now = utcnow() - if cmd.activate_in is not None: - next_at: _dt.datetime | None = now + cmd.activate_in - else: - next_at = cmd.activate_at + next_at = resolve_next_attempt_client_side(cmd.activate_in, cmd.activate_at, now) rows: list[dict[str, typing.Any]] = [] total_size = 0 start_perf = time.perf_counter() @@ -197,7 +181,7 @@ async def publish_batch(self, cmd: OutboxPublishCommand) -> None: rows.append(row) await cmd.session.execute(insert(self._table), rows) # Skip NOTIFY only when genuinely future-dated; past times are eligible. - if not _is_future_dated(cmd.activate_in, cmd.activate_at, now): + if not is_future_dated(cmd.activate_in, cmd.activate_at, now): await self._notify(cmd.session, cmd.queue) except Exception as exc: self._emit_metric( diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 1bacfad..89f8ac3 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -21,12 +21,9 @@ from faststream._internal.testing.broker import TestBroker, patch_broker_calls +from faststream_outbox._scheduling import resolve_next_attempt_client_side, validate_activate_args from faststream_outbox._time import utcnow -from faststream_outbox.broker import ( - OutboxBroker, - _compute_next_at_client_side, - _validate_activate_args, -) +from faststream_outbox.broker import OutboxBroker from faststream_outbox.client import AbstractOutboxClient from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage @@ -415,8 +412,8 @@ def __init__( self._run_loops = run_loops async def publish(self, cmd: OutboxPublishCommand) -> int | None: - _validate_activate_args("broker.publish", cmd.activate_in, cmd.activate_at) - next_at = _compute_next_at_client_side(cmd.activate_in, cmd.activate_at) + validate_activate_args("broker.publish", cmd.activate_in, cmd.activate_at) + next_at = resolve_next_attempt_client_side(cmd.activate_in, cmd.activate_at, utcnow()) return await _fake_publish_one( self._fake_client, self._broker, @@ -431,8 +428,8 @@ async def publish(self, cmd: OutboxPublishCommand) -> int | None: ) async def publish_batch(self, cmd: OutboxPublishCommand) -> None: - _validate_activate_args("broker.publish_batch", cmd.activate_in, cmd.activate_at) - next_at = _compute_next_at_client_side(cmd.activate_in, cmd.activate_at) + validate_activate_args("broker.publish_batch", cmd.activate_in, cmd.activate_at) + next_at = resolve_next_attempt_client_side(cmd.activate_in, cmd.activate_at, utcnow()) await _fake_publish_many( self._fake_client, self._broker, @@ -478,8 +475,8 @@ async def fake_publish( # production and from ``publisher.publish()`` / ``OutboxResponse``, which require a # real AsyncSession; tests that assert that contract must use those paths. del session - _validate_activate_args("broker.publish", activate_in, activate_at) - next_at = _compute_next_at_client_side(activate_in, activate_at) + validate_activate_args("broker.publish", activate_in, activate_at) + next_at = resolve_next_attempt_client_side(activate_in, activate_at, utcnow()) return await _fake_publish_one( fake_client, broker, @@ -512,10 +509,10 @@ async def fake_publish_batch( activate_at: _dt.datetime | None = None, ) -> None: del session - _validate_activate_args("broker.publish_batch", activate_in, activate_at) + validate_activate_args("broker.publish_batch", activate_in, activate_at) if not bodies: return - next_at = _compute_next_at_client_side(activate_in, activate_at) + next_at = resolve_next_attempt_client_side(activate_in, activate_at, utcnow()) await _fake_publish_many( fake_client, broker, From 5cd4801b914723dcfedb142b6690decfe8b4d1cd Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 16:39:04 +0300 Subject: [PATCH 2/6] test(client): add contract suite run against both fake and real adapters One parametrized scenario module pins the shared AbstractOutboxClient surface (fetch / delete_with_lease / mark_pending_with_lease + DLQ) against both the in-memory fake and real Postgres (auto-skipped without a DB). A per-adapter harness hides substrate differences; scheduling is seeded as server-side make_interval offsets, matching the existing predicate-parity idiom. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_client_contract.py | 402 ++++++++++++++++++++++++++++++++++ 1 file changed, 402 insertions(+) create mode 100644 tests/test_client_contract.py diff --git a/tests/test_client_contract.py b/tests/test_client_contract.py new file mode 100644 index 0000000..4ed869b --- /dev/null +++ b/tests/test_client_contract.py @@ -0,0 +1,402 @@ +""" +Cross-adapter contract for ``AbstractOutboxClient``. + +The two adapters — ``OutboxClient`` (SQL/Postgres) and ``FakeOutboxClient`` +(in-memory) — implement the same rules in different substrates: the real client runs +eligibility / lease cutoff / retry timing *inside Postgres*; the fake runs the +equivalents in Python. Those implementations cannot share code, so this module pins +them to one behavioural contract instead. Every scenario runs against **both** the +fake (everywhere) and real Postgres (auto-skipped when unreachable). + +Scope is exactly the shared ``AbstractOutboxClient`` surface: ``fetch``, +``delete_with_lease``, ``mark_pending_with_lease`` (+ the DLQ side-effect). +``cancel_timer`` and ``timer_id`` insert-dedup live on the broker / producer, not on +the client interface, so they are covered by ``test_integration.py`` / +``test_fake.py``, not here. + +A per-adapter harness hides the substrate differences (how a row is seeded, which +connection a terminal write needs) so the scenarios read adapter-agnostically. +Scheduling is expressed as **offsets from now** seeded server-side on the real path +(``now() + make_interval(...)``) — the same clock-skew-free idiom the existing +fake/real predicate-parity test uses. Expectations are hand-specified, never computed +from a shared helper, so neither adapter can pass trivially against itself. + +The one drift this cannot catch: an in-process test can't manufacture cross-host +DB-vs-worker clock skew, so the real client's server-side ``make_interval`` clock +authority stays a documented invariant (CLAUDE.md), not an assertion here. +""" + +import datetime as _dt +import os +import uuid +from collections.abc import AsyncIterator + +import pytest +from sqlalchemy import MetaData, Table, insert, select, text +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from faststream_outbox import make_dlq_table, make_outbox_table +from faststream_outbox._time import utcnow +from faststream_outbox.client import OutboxClient +from faststream_outbox.message import OutboxInnerMessage +from faststream_outbox.testing import FakeOutboxClient + + +pytestmark = pytest.mark.asyncio + +PG_DSN = os.environ.get("POSTGRES_DSN", "postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") + + +class _FakeHarness: + """Adapter-agnostic harness backed by an in-memory ``FakeOutboxClient``.""" + + kind = "fake" + + def __init__(self) -> None: + self._client = FakeOutboxClient() + + async def seed( + self, + *, + queue: str = "q", + payload: bytes = b"x", + next_attempt_offset: float = -1.0, + timer_id: str | None = None, + leased_token: uuid.UUID | None = None, + acquired_age: float | None = None, + deliveries_count: int = 0, + ) -> int: + rid = self._client.feed( + queue=queue, + payload=payload, + next_attempt_at=utcnow() + _dt.timedelta(seconds=next_attempt_offset), + timer_id=timer_id, + ) + assert rid is not None + row = next(r for r in self._client.rows if r.id == rid) + row.deliveries_count = deliveries_count + if leased_token is not None: + row.acquired_token = leased_token + row.acquired_at = utcnow() - _dt.timedelta(seconds=acquired_age or 0.0) + return rid + + async def fetch(self, queues: list[str], *, limit: int, lease_ttl_seconds: float) -> list[OutboxInnerMessage]: + return await self._client.fetch(None, queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) + + async def delete( + self, + message_id: int, + token: uuid.UUID, + *, + dlq_payload: dict[str, object] | None = None, + ) -> bool: + return await self._client.delete_with_lease(None, message_id, token, dlq_payload=dlq_payload) + + async def mark_pending( + self, + message_id: int, + token: uuid.UUID, + *, + delay_seconds: float, + attempts_count: int, + first_attempt_at: _dt.datetime, + last_attempt_at: _dt.datetime, + ) -> bool: + return await self._client.mark_pending_with_lease( + None, + message_id, + token, + delay_seconds=delay_seconds, + attempts_count=attempts_count, + first_attempt_at=first_attempt_at, + last_attempt_at=last_attempt_at, + ) + + async def get_row(self, message_id: int) -> dict[str, object] | None: + row = next((r for r in self._client.rows if r.id == message_id), None) + if row is None: + return None + return { + "id": row.id, + "queue": row.queue, + "payload": bytes(row.payload), + "acquired_token": row.acquired_token, + "acquired_at": row.acquired_at, + "attempts_count": row.attempts_count, + "deliveries_count": row.deliveries_count, + "next_attempt_at": row.next_attempt_at, + "timer_id": row.timer_id, + } + + async def dlq_rows(self) -> list[dict[str, object]]: + return [dict(r) for r in self._client.dlq_rows] + + +class _RealHarness: + """Adapter-agnostic harness backed by a real ``OutboxClient`` against Postgres.""" + + kind = "real" + + def __init__(self, engine: AsyncEngine, table: Table, dlq: Table) -> None: + self._engine = engine + self._table = table + self._dlq = dlq + self._client = OutboxClient(engine, table, dlq_table=dlq) + + async def seed( + self, + *, + queue: str = "q", + payload: bytes = b"x", + next_attempt_offset: float = -1.0, + timer_id: str | None = None, + leased_token: uuid.UUID | None = None, + acquired_age: float | None = None, + deliveries_count: int = 0, + ) -> int: + # Server-side ``now()`` arithmetic (not a Python literal) keeps the offsets + # clock-skew-free and lets the column default never sneak in. + values: dict[str, object] = { + "queue": queue, + "payload": payload, + "next_attempt_at": text("now() + make_interval(secs => :nas)").bindparams(nas=next_attempt_offset), + "deliveries_count": deliveries_count, + } + if timer_id is not None: + values["timer_id"] = timer_id + if leased_token is not None: + values["acquired_token"] = leased_token + values["acquired_at"] = text("now() - make_interval(secs => :aas)").bindparams(aas=acquired_age or 0.0) + async with self._engine.begin() as conn: + result = await conn.execute(insert(self._table).values(**values).returning(self._table.c.id)) + return result.scalar_one() + + async def fetch(self, queues: list[str], *, limit: int, lease_ttl_seconds: float) -> list[OutboxInnerMessage]: + async with self._engine.connect() as conn: + return await self._client.fetch(conn, queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) + + async def delete( + self, + message_id: int, + token: uuid.UUID, + *, + dlq_payload: dict[str, object] | None = None, + ) -> bool: + async with self._engine.connect() as raw: + writer = await raw.execution_options(isolation_level="AUTOCOMMIT") + return await self._client.delete_with_lease(writer, message_id, token, dlq_payload=dlq_payload) + + async def mark_pending( + self, + message_id: int, + token: uuid.UUID, + *, + delay_seconds: float, + attempts_count: int, + first_attempt_at: _dt.datetime, + last_attempt_at: _dt.datetime, + ) -> bool: + async with self._engine.connect() as raw: + writer = await raw.execution_options(isolation_level="AUTOCOMMIT") + return await self._client.mark_pending_with_lease( + writer, + message_id, + token, + delay_seconds=delay_seconds, + attempts_count=attempts_count, + first_attempt_at=first_attempt_at, + last_attempt_at=last_attempt_at, + ) + + async def get_row(self, message_id: int) -> dict[str, object] | None: + async with self._engine.connect() as conn: + result = await conn.execute(select(self._table).where(self._table.c.id == message_id)) + mapping = result.mappings().first() + if mapping is None: + return None + row = dict(mapping) + row["payload"] = bytes(row["payload"]) + return row + + async def dlq_rows(self) -> list[dict[str, object]]: + async with self._engine.connect() as conn: + result = await conn.execute(select(self._dlq)) + rows = [dict(m) for m in result.mappings().all()] + for row in rows: + row["payload"] = bytes(row["payload"]) + return rows + + +_Harness = _FakeHarness | _RealHarness + + +@pytest.fixture(params=["fake", "real"]) +async def contract(request: pytest.FixtureRequest) -> AsyncIterator[_Harness]: + """Yield a harness over each adapter. The ``real`` param auto-skips without Postgres.""" + if request.param == "fake": + yield _FakeHarness() + return + engine = create_async_engine(PG_DSN, future=True) + try: + async with engine.connect() as conn: + await conn.exec_driver_sql("SELECT 1") + except Exception as exc: # noqa: BLE001 # pragma: no cover + await engine.dispose() + pytest.skip(f"Postgres not available at {PG_DSN}: {exc}") + metadata = MetaData() + suffix = uuid.uuid4().hex[:12] + table = make_outbox_table(metadata, table_name=f"test_ctr_outbox_{suffix}") + dlq = make_dlq_table(metadata, table_name=f"test_ctr_dlq_{suffix}") + async with engine.begin() as conn: + await conn.run_sync(metadata.create_all) + try: + yield _RealHarness(engine, table, dlq) + finally: + async with engine.begin() as conn: + await conn.run_sync(metadata.drop_all) + await engine.dispose() + + +# --- fetch ----------------------------------------------------------------- + + +async def test_fetch_claims_unleased_row(contract: _Harness) -> None: + rid = await contract.seed(queue="orders", payload=b"a") + msgs = await contract.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) + assert [m.id for m in msgs] == [rid] + assert msgs[0].acquired_token is not None + assert msgs[0].acquired_at is not None + assert msgs[0].deliveries_count == 1 + + +async def test_fetch_skips_future_dated(contract: _Harness) -> None: + await contract.seed(queue="orders", next_attempt_offset=3600.0) + msgs = await contract.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) + assert msgs == [] + + +async def test_fetch_reclaims_expired_lease(contract: _Harness) -> None: + old_token = uuid.uuid4() + rid = await contract.seed(queue="orders", leased_token=old_token, acquired_age=120.0, deliveries_count=1) + msgs = await contract.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) + assert [m.id for m in msgs] == [rid] + assert msgs[0].acquired_token != old_token + assert msgs[0].deliveries_count == 2 + + +async def test_fetch_skips_fresh_lease(contract: _Harness) -> None: + await contract.seed(queue="orders", leased_token=uuid.uuid4(), acquired_age=1.0, deliveries_count=1) + msgs = await contract.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) + assert msgs == [] + + +async def test_fetch_selects_oldest_under_limit(contract: _Harness) -> None: + # FIFO *selection* under contention is the contract; within-batch *return order* + # is unspecified for the real client (F2-09 — the outer UPDATE…RETURNING is + # unordered), so assert on the chosen set, not its order. + rid_a = await contract.seed(queue="orders", next_attempt_offset=-10.0) + rid_b = await contract.seed(queue="orders", next_attempt_offset=-20.0) + rid_c = await contract.seed(queue="orders", next_attempt_offset=-30.0) + msgs = await contract.fetch(["orders"], limit=2, lease_ttl_seconds=60.0) + assert {m.id for m in msgs} == {rid_c, rid_b} + assert rid_a not in {m.id for m in msgs} + + +async def test_fetch_respects_limit(contract: _Harness) -> None: + for _ in range(3): + await contract.seed(queue="orders") + msgs = await contract.fetch(["orders"], limit=2, lease_ttl_seconds=60.0) + assert len(msgs) == 2 + + +async def test_fetch_filters_by_queue(contract: _Harness) -> None: + await contract.seed(queue="orders") + await contract.seed(queue="other") + msgs = await contract.fetch(["orders"], limit=10, lease_ttl_seconds=60.0) + assert {m.queue for m in msgs} == {"orders"} + + +# --- delete_with_lease ----------------------------------------------------- + + +async def test_delete_deletes_on_token_match(contract: _Harness) -> None: + token = uuid.uuid4() + rid = await contract.seed(leased_token=token, acquired_age=1.0) + assert await contract.delete(rid, token) is True + assert await contract.get_row(rid) is None + + +async def test_delete_noop_on_token_mismatch(contract: _Harness) -> None: + rid = await contract.seed(leased_token=uuid.uuid4(), acquired_age=1.0) + assert await contract.delete(rid, uuid.uuid4()) is False + assert await contract.get_row(rid) is not None + + +async def test_delete_noop_on_unleased_row(contract: _Harness) -> None: + rid = await contract.seed() + assert await contract.delete(rid, uuid.uuid4()) is False + assert await contract.get_row(rid) is not None + + +async def test_delete_with_dlq_materializes_audit_row(contract: _Harness) -> None: + token = uuid.uuid4() + rid = await contract.seed(queue="orders", payload=b"body", timer_id="t-1", leased_token=token, acquired_age=1.0) + deleted = await contract.delete( + rid, + token, + dlq_payload={"failure_reason": "boom", "last_exception": "Traceback..."}, + ) + assert deleted is True + assert await contract.get_row(rid) is None + dlq = await contract.dlq_rows() + assert len(dlq) == 1 + assert dlq[0]["original_id"] == rid + assert dlq[0]["queue"] == "orders" + assert dlq[0]["payload"] == b"body" + assert dlq[0]["failure_reason"] == "boom" + assert dlq[0]["last_exception"] == "Traceback..." + assert dlq[0]["timer_id"] == "t-1" + + +# --- mark_pending_with_lease ----------------------------------------------- + + +async def test_mark_pending_reschedules_on_token_match(contract: _Harness) -> None: + token = uuid.uuid4() + now = utcnow() + rid = await contract.seed(next_attempt_offset=-5.0, leased_token=token, acquired_age=1.0) + ok = await contract.mark_pending( + rid, + token, + delay_seconds=60.0, + attempts_count=1, + first_attempt_at=now, + last_attempt_at=now, + ) + assert ok is True + row = await contract.get_row(rid) + assert row is not None + assert row["acquired_token"] is None + assert row["acquired_at"] is None + assert row["attempts_count"] == 1 + next_attempt_at = row["next_attempt_at"] + assert isinstance(next_attempt_at, _dt.datetime) + assert next_attempt_at > utcnow() + + +async def test_mark_pending_noop_on_token_mismatch(contract: _Harness) -> None: + token = uuid.uuid4() + now = utcnow() + rid = await contract.seed(leased_token=token, acquired_age=1.0) + ok = await contract.mark_pending( + rid, + uuid.uuid4(), + delay_seconds=60.0, + attempts_count=1, + first_attempt_at=now, + last_attempt_at=now, + ) + assert ok is False + row = await contract.get_row(rid) + assert row is not None + assert row["acquired_token"] == token From 7c6ba39dd4c25935a804153043b9a4bb2e258836 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 16:44:30 +0300 Subject: [PATCH 3/6] refactor(dlq): derive outbox->DLQ projection from one declarative map _DLQ_PROJECTION (+ _DLQ_INJECTED_COLUMNS) in schema.py is now the single source the real DLQ CTE and the fake's delete_with_lease both build from, replacing the hand-kept parity (the P9 comment) between the two substrates. Co-Authored-By: Claude Opus 4.8 (1M context) --- faststream_outbox/client.py | 28 ++++++++++++++++++---------- faststream_outbox/schema.py | 20 ++++++++++++++++++++ faststream_outbox/testing.py | 28 ++++++++++++---------------- 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index 40f7e97..9d629e3 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -43,6 +43,8 @@ from faststream_outbox._import_checker import is_alembic_installed from faststream_outbox.message import OutboxInnerMessage from faststream_outbox.schema import ( + _DLQ_INJECTED_COLUMNS, + _DLQ_PROJECTION, _LEASE_CK_SUFFIX, _LEASE_IDX_SUFFIX, _PENDING_IDX_SUFFIX, @@ -329,21 +331,27 @@ def _build_dlq_cte_stmt( # wrote to a same-named search_path table (B10). outbox_name = preparer.format_table(self._table) dlq_name = preparer.format_table(dlq_table) - # S608: outbox_name / dlq_name come from application-defined SQLAlchemy - # Table objects (not request input) and are quoted via the dialect's - # identifier preparer — values flow through :bindparam placeholders. + # Column lists derive from _DLQ_PROJECTION (projected pairs first, then the + # injected failure-context columns) so the real CTE and the fake stay in lockstep + # off one source. INSERT and SELECT share the same order, so the named columns map + # positionally. + returning_cols = ", ".join(out for out, _ in _DLQ_PROJECTION) + insert_cols = ", ".join([dlq for _, dlq in _DLQ_PROJECTION] + list(_DLQ_INJECTED_COLUMNS)) + select_exprs = ", ".join( + [out for out, _ in _DLQ_PROJECTION] + [f":{col}" for col in _DLQ_INJECTED_COLUMNS], + ) + # S608: outbox_name / dlq_name come from application-defined SQLAlchemy Table + # objects (not request input) and are quoted via the dialect's identifier preparer; + # the column names come from _DLQ_PROJECTION constants. Values flow through + # :bindparam placeholders. cte_sql = ( f"WITH deleted AS (" # noqa: S608 f"DELETE FROM {outbox_name} " f"WHERE id = :message_id AND acquired_token = :acquired_token " - f"RETURNING id, queue, payload, headers, deliveries_count, created_at, timer_id" - f") " - f"INSERT INTO {dlq_name} (" - f"original_id, queue, payload, headers, deliveries_count, created_at, " - f"failure_reason, last_exception, timer_id" + f"RETURNING {returning_cols}" f") " - f"SELECT id, queue, payload, headers, deliveries_count, created_at, " - f":failure_reason, :last_exception, timer_id " + f"INSERT INTO {dlq_name} ({insert_cols}) " + f"SELECT {select_exprs} " f"FROM deleted" ) sql = text(cte_sql) diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py index d4f985f..46e0385 100644 --- a/faststream_outbox/schema.py +++ b/faststream_outbox/schema.py @@ -155,6 +155,26 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table return table +# Outbox-row columns copied verbatim into the DLQ archive row on terminal failure, as +# ``(outbox_column, dlq_column)`` pairs. The single source both the real client's DLQ +# CTE (``OutboxClient._build_dlq_cte_stmt``) and the fake (``FakeOutboxClient.delete_with_lease``) +# build from, so a DLQ column change is one edit here instead of hand-kept parity in two +# substrates. ``failed_at`` is not listed — it rides the DLQ column's ``server_default``. +_DLQ_PROJECTION: tuple[tuple[str, str], ...] = ( + ("id", "original_id"), + ("queue", "queue"), + ("payload", "payload"), + ("headers", "headers"), + ("deliveries_count", "deliveries_count"), + ("created_at", "created_at"), + ("timer_id", "timer_id"), +) + +# DLQ columns supplied by the caller (failure context), not copied from the outbox row. +# Their names double as the bind-parameter names on the real path. +_DLQ_INJECTED_COLUMNS: tuple[str, ...] = ("failure_reason", "last_exception") + + def make_dlq_table(metadata: "MetaData", table_name: str = "outbox_dlq") -> Table: """ Build the dead-letter-queue ``Table`` and attach it to *metadata*. diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 89f8ac3..2b0eded 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -29,6 +29,7 @@ from faststream_outbox.message import OutboxInnerMessage from faststream_outbox.metrics import _safe_emit from faststream_outbox.response import _REQUEST_UNSUPPORTED_MSG, OutboxPublishCommand +from faststream_outbox.schema import _DLQ_INJECTED_COLUMNS, _DLQ_PROJECTION if typing.TYPE_CHECKING: @@ -175,22 +176,17 @@ async def delete_with_lease( # delete where the real client no-ops. if row.id == message_id and acquired_token is not None and row.acquired_token == acquired_token: if dlq_payload is not None: - # Mirror the real CTE side-effect: DLQ row materializes in the - # same call as the DELETE, before the row is removed. - self._dlq_rows.append( - { - "original_id": row.id, - "queue": row.queue, - "payload": row.payload, - "headers": row.headers, - "deliveries_count": row.deliveries_count, - "created_at": row.created_at, - "failed_at": utcnow(), - "failure_reason": dlq_payload["failure_reason"], - "last_exception": dlq_payload["last_exception"], - "timer_id": row.timer_id, # P9 parity with the real DLQ CTE - }, - ) + # Mirror the real CTE side-effect: DLQ row materializes in the same + # call as the DELETE, before the row is removed. Built from the shared + # _DLQ_PROJECTION so the fake can't drift from the real CTE; failed_at + # mirrors the DLQ column server_default. + dlq_row: dict[str, typing.Any] = { + dlq_col: getattr(row, outbox_col) for outbox_col, dlq_col in _DLQ_PROJECTION + } + dlq_row["failed_at"] = utcnow() + for col in _DLQ_INJECTED_COLUMNS: + dlq_row[col] = dlq_payload[col] + self._dlq_rows.append(dlq_row) del self._rows[i] return True return False From ed3a0c5ee761360c1179b420bef8b074760bf725 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 16:53:03 +0300 Subject: [PATCH 4/6] test: drop per-adapter cases now owned by the client contract suite Remove the basic single-adapter fetch/delete/mark scenarios and the manual fake-vs-real predicate-parity test that test_client_contract.py now subsumes on both adapters. Real-specific coverage (SKIP LOCKED concurrency, autocommit round-trip, DB-clock retry timing, DLQ CTE) stays. Coverage holds at 100%. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_fake.py | 8 -- tests/test_integration.py | 213 -------------------------------------- 2 files changed, 221 deletions(-) diff --git a/tests/test_fake.py b/tests/test_fake.py index 17a3451..77a2a2a 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -1197,14 +1197,6 @@ async def test_fake_client_feed_timer_id_different_queues_allowed() -> None: assert len(fake.rows) == 2 -async def test_fake_client_future_next_attempt_is_invisible_to_fetch() -> None: - fake = FakeOutboxClient() - future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(minutes=5) - fake.feed(queue="q", payload=b"x", next_attempt_at=future) - rows = await fake.fetch(None, ["q"], limit=10, lease_ttl_seconds=60.0) - assert rows == [] - - async def test_fake_client_cancel_timer_removes_unleased_row() -> None: fake = FakeOutboxClient() fake.feed(queue="q", payload=b"x", timer_id="email-1") diff --git a/tests/test_integration.py b/tests/test_integration.py index e9fad93..a46c6e7 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -2,7 +2,6 @@ import asyncio import datetime as _dt -import json import logging import uuid from collections.abc import Mapping @@ -26,7 +25,6 @@ from faststream_outbox.client import OutboxClient from faststream_outbox.envelope import _encode_payload as encode_payload from faststream_outbox.publisher.fake import OutboxFakePublisher -from faststream_outbox.testing import FakeOutboxClient pytestmark = pytest.mark.asyncio @@ -121,29 +119,6 @@ async def test_ping_succeeds(pg_engine, outbox_table) -> None: assert await client.ping() is True -async def test_fetch_returns_pending_rows_only(pg_engine, outbox_table) -> None: - async with pg_engine.begin() as conn: - for i in range(3): - await conn.execute(insert(outbox_table).values(queue="orders", payload=f"p-{i}".encode())) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as conn: - rows = await client.fetch(conn, ["orders"], limit=10, lease_ttl_seconds=60.0) - assert len(rows) == 3 - assert {r.queue for r in rows} == {"orders"} - assert all(r.acquired_token is not None for r in rows) - - -async def test_fetch_skips_other_queues(pg_engine, outbox_table) -> None: - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - await conn.execute(insert(outbox_table).values(queue="other", payload=b"y")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as conn: - rows = await client.fetch(conn, ["orders"], limit=10, lease_ttl_seconds=60.0) - assert len(rows) == 1 - assert rows[0].queue == "orders" - - async def test_two_concurrent_fetches_dont_double_claim(pg_engine, outbox_table) -> None: async with pg_engine.begin() as conn: for i in range(20): @@ -202,35 +177,6 @@ async def test_fetch_skips_rows_locked_by_another_transaction(pg_engine, outbox_ assert len(claimed_ids) == 10 # exactly the rows the holder didn't lock -async def test_delete_with_lease_succeeds_with_correct_token(pg_engine: AsyncEngine, outbox_table: Table) -> None: - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as fetch_conn: - rows = await client.fetch(fetch_conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - assert len(rows) == 1 - # delete_with_lease expects an AUTOCOMMIT-configured conn (the production writer - # conn from _open_worker_resources) — same shape here. - async with pg_engine.connect() as raw_conn: - writer_conn = await raw_conn.execution_options(isolation_level="AUTOCOMMIT") - deleted = await client.delete_with_lease(writer_conn, rows[0].id, rows[0].acquired_token) # ty: ignore[invalid-argument-type] - assert deleted is True - assert await _row_count(pg_engine, outbox_table) == 0 - - -async def test_delete_with_wrong_token_is_noop(pg_engine: AsyncEngine, outbox_table: Table) -> None: - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as fetch_conn: - rows = await client.fetch(fetch_conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - async with pg_engine.connect() as raw_conn: - writer_conn = await raw_conn.execution_options(isolation_level="AUTOCOMMIT") - deleted = await client.delete_with_lease(writer_conn, rows[0].id, uuid.uuid4()) # wrong token - assert deleted is False - assert await _row_count(pg_engine, outbox_table) == 1 # row still there - - async def test_writer_connection_autocommit_round_trip(pg_engine: AsyncEngine, outbox_table: Table) -> None: """ Autocommit-configured writer conn runs ``delete_with_lease`` end-to-end against real Postgres. @@ -263,66 +209,6 @@ async def test_writer_connection_autocommit_round_trip(pg_engine: AsyncEngine, o assert await _row_count(pg_engine, outbox_table) == 0 -async def test_mark_pending_with_lease(pg_engine: AsyncEngine, outbox_table: Table) -> None: - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as fetch_conn: - rows = await client.fetch(fetch_conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - msg = rows[0] - # mark_pending_with_lease expects an AUTOCOMMIT-configured conn (production writer conn). - async with pg_engine.connect() as raw_conn: - writer_conn = await raw_conn.execution_options(isolation_level="AUTOCOMMIT") - updated = await client.mark_pending_with_lease( - writer_conn, - msg.id, - msg.acquired_token, # ty: ignore[invalid-argument-type] - delay_seconds=600.0, # 10 minutes in the future - attempts_count=1, - first_attempt_at=_dt.datetime.now(tz=_dt.UTC), - last_attempt_at=_dt.datetime.now(tz=_dt.UTC), - ) - assert updated is True - # Refetch — should be empty because next_attempt_at is in the future - async with pg_engine.connect() as fetch_conn: - rows2 = await client.fetch(fetch_conn, ["orders"], limit=10, lease_ttl_seconds=60.0) - assert rows2 == [] - - -async def test_mark_pending_with_wrong_token_is_noop(pg_engine: AsyncEngine, outbox_table: Table) -> None: - """ - T1: mark_pending_with_lease must filter on acquired_token (the UPDATE half of the lease invariant). - - Mirrors test_delete_with_wrong_token_is_noop for the retry path: a slow handler whose lease - was reclaimed by a newer fetch must NOT reschedule/release the row the new holder now owns. - Deleting ``acquired_token == :token`` from the WHERE would update by id alone. - """ - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as fetch_conn: - rows = await client.fetch(fetch_conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - msg = rows[0] - async with pg_engine.connect() as raw_conn: - writer_conn = await raw_conn.execution_options(isolation_level="AUTOCOMMIT") - updated = await client.mark_pending_with_lease( - writer_conn, - msg.id, - uuid.uuid4(), # WRONG token — not the lease holder - delay_seconds=600.0, - attempts_count=1, - first_attempt_at=_dt.datetime.now(tz=_dt.UTC), - last_attempt_at=_dt.datetime.now(tz=_dt.UTC), - ) - assert updated is False - # The row is untouched: still leased under the ORIGINAL token, attempts_count not bumped, - # next_attempt_at not pushed out — the mutation (id-only WHERE) would change all three. - async with pg_engine.connect() as conn: - row = (await conn.execute(select(outbox_table).where(outbox_table.c.id == msg.id))).mappings().one() - assert row["attempts_count"] == 0 - assert row["acquired_token"] == msg.acquired_token # lease holder unchanged (not released) - - async def test_mark_pending_with_lease_uses_db_clock(pg_engine: AsyncEngine, outbox_table: Table) -> None: """next_attempt_at must be computed server-side as now() + delay, not from the worker's clock.""" async with pg_engine.begin() as conn: @@ -356,40 +242,6 @@ async def test_mark_pending_with_lease_uses_db_clock(pg_engine: AsyncEngine, out assert db_before + _dt.timedelta(seconds=delay) <= next_at <= db_after + _dt.timedelta(seconds=delay) -async def test_expired_lease_is_reclaimed_by_fetch(pg_engine, outbox_table) -> None: - """A row whose lease has expired must be re-claimed by the next fetch with a fresh token.""" - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as conn: - first = await client.fetch(conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - assert first - original_token = first[0].acquired_token - # Backdate acquired_at so the lease is now considered expired by a 60s TTL. - backdate_sql = f"UPDATE \"{outbox_table.name}\" SET acquired_at = NOW() - INTERVAL '1 hour'" # noqa: S608 - async with pg_engine.begin() as conn: - await conn.exec_driver_sql(backdate_sql) - async with pg_engine.connect() as conn: - second = await client.fetch(conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - assert len(second) == 1 - assert second[0].id == first[0].id - assert second[0].acquired_token != original_token # fresh lease holder - - -async def test_unexpired_lease_is_not_reclaimed_by_fetch(pg_engine, outbox_table) -> None: - """A still-valid lease must NOT be reclaimed by another fetch.""" - async with pg_engine.begin() as conn: - await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) - client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as conn: - first = await client.fetch(conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - assert first - # Lease was just set; a fresh fetch with a 60s TTL must find nothing. - async with pg_engine.connect() as conn: - second = await client.fetch(conn, ["orders"], limit=1, lease_ttl_seconds=60.0) - assert second == [] - - async def test_end_to_end_subscriber_delivers_inserted_row(pg_engine, outbox_table) -> None: received: list[dict] = [] broker = OutboxBroker(pg_engine, outbox_table=outbox_table) @@ -1019,71 +871,6 @@ def _on_checkout(*_args: object) -> None: ) -async def test_fake_and_real_fetch_agree_on_eligibility_predicate(pg_engine, outbox_table) -> None: - """ - T1 — fake/real predicate parity across the five eligibility states. - - ``OutboxClient.fetch`` (SQL) and ``FakeOutboxClient.fetch`` (Python) compute - eligibility independently; without this test, drift between them is silent — - unit tests green, production red. The five states exercised: unleased, - future-dated, leased-fresh (within TTL), leased-expired (past TTL), - queue-mismatch. - """ - lease_ttl = 60.0 - queues_to_fetch = ["orders"] - # Each spec packs label, queue, next_attempt offset (s), and acquired-age (s) or None. - specs: list[tuple[str, str, float, float | None]] = [ - ("unleased", "orders", -1.0, None), - ("future", "orders", 60.0, None), - ("leased-fresh", "orders", -1.0, 5.0), - ("leased-expired", "orders", -1.0, 120.0), - ("queue-mismatch", "other", -1.0, None), - ] - expected_eligible = {"unleased", "leased-expired"} - - # Real side — server-side ``now()`` arithmetic keeps the offsets clock-skew-free. - session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) - async with session_factory() as session, session.begin(): - for label, queue, offset, acq_age in specs: - payload, headers = encode_payload({"label": label}) - values: dict[str, object] = { - "queue": queue, - "payload": payload, - "headers": headers, - "next_attempt_at": text("now() + make_interval(secs => :next_s)").bindparams(next_s=offset), - } - if acq_age is not None: - values["acquired_token"] = uuid.uuid4() - values["acquired_at"] = text("now() - make_interval(secs => :acq_s)").bindparams(acq_s=acq_age) - await session.execute(insert(outbox_table).values(**values)) - real_client = OutboxClient(pg_engine, outbox_table) - async with pg_engine.connect() as conn: - real_rows = await real_client.fetch(conn, queues_to_fetch, limit=100, lease_ttl_seconds=lease_ttl) - real_labels = {json.loads(r.payload)["label"] for r in real_rows} - - # Fake side — separate ID space; correlate by payload label. Offsets (>=1s) - # dwarf any plausible Python/DB clock skew, so the comparison is stable. - now = _dt.datetime.now(_dt.UTC) - fake = FakeOutboxClient() - for label, queue, offset, acq_age in specs: - payload, headers = encode_payload({"label": label}) - fake.feed( - queue=queue, - payload=payload, - headers=headers, - next_attempt_at=now + _dt.timedelta(seconds=offset), - ) - if acq_age is not None: - fake.rows[-1].acquired_token = uuid.uuid4() - fake.rows[-1].acquired_at = now - _dt.timedelta(seconds=acq_age) - fake_rows = await fake.fetch(None, queues_to_fetch, limit=100, lease_ttl_seconds=lease_ttl) - fake_labels = {json.loads(r.payload)["label"] for r in fake_rows} - - assert real_labels == fake_labels == expected_eligible, ( - f"predicate drift — real={real_labels} fake={fake_labels} expected={expected_eligible}" - ) - - async def test_concurrent_drain_with_eight_workers_holds_pool_bounded(pg_engine, outbox_table) -> None: """ T2 — multi-worker drain: 500 rows + max_workers=8 keeps pool checkouts O(workers). From 9015314414e7efaaa897e93f5228327b3388155f Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 17:00:13 +0300 Subject: [PATCH 5/6] docs: record DLQ projection + client contract; close out change Promote the conclusions into CLAUDE.md (User-owned schema / DLQ / Test broker) and architecture/{dlq,test-broker}.md alongside the code, and mark the client-rules-kernel change shipped. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 6 +- architecture/dlq.md | 2 + architecture/test-broker.md | 6 + .../design.md | 197 ++++++++++++++++ .../2026-06-23.01-client-rules-kernel/plan.md | 218 ++++++++++++++++++ 5 files changed, 428 insertions(+), 1 deletion(-) create mode 100644 planning/changes/2026-06-23.01-client-rules-kernel/design.md create mode 100644 planning/changes/2026-06-23.01-client-rules-kernel/plan.md diff --git a/CLAUDE.md b/CLAUDE.md index e52723d..7caa9c5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -86,6 +86,8 @@ There is **no `state` column**: a row is "available" iff `acquired_token IS NULL Atomicity: `delete_with_lease` switches to a single CTE `WITH deleted AS (DELETE … RETURNING …) INSERT INTO SELECT …` — preserves writer-connection autocommit + lease-token guard. INSERT failure rolls back DELETE, so DLQ misconfiguration surfaces as outbox growth + `lease_lost` spikes, not silent audit loss. +**DLQ projection.** The outbox→DLQ column mapping is `_DLQ_PROJECTION` (+ `_DLQ_INJECTED_COLUMNS`) in `schema.py` — one declarative `(outbox_col, dlq_col)` list that the real CTE (`OutboxClient._build_dlq_cte_stmt`) and the fake (`FakeOutboxClient.delete_with_lease`) both build their column lists from. Adding a DLQ column is one edit there, not hand-kept parity across SQL and Python. + `OutboxInnerMessage.terminal_failure_reason` is set on three paths: `allow_delivery` False → `"max_deliveries"`, `_nack` exhausted → `"retry_terminal"`, `_reject` → `"rejected"`. **Branch on `terminal_failure_reason` BEFORE `last_exception`** in `dispatch_one` so manual `await msg.reject()` (no exception raised) routes to `nacked_terminal(reason="rejected")`, not `acked`. **The `DLQFailureReason` `Literal` (`message.py`) is the public contract** — operator queries / dashboards key off these values; changes are API-breaking. @@ -107,7 +109,7 @@ Per subscriber: **Connection budget.** Each subscriber holds `max_workers + 1` SQLAlchemy pool connections steady-state + one raw asyncpg connection for LISTEN. Size the pool for `Σ subscribers × (max_workers + 1)` or startup blocks on checkout — the asyncpg LISTEN connection lives **outside** the pool, so it does not count toward pool sizing. **Per process** — Postgres `max_connections` must cover `replicas × Σ subscribers × (max_workers + 2)`: the `max_workers + 1` pool connections **plus** the out-of-pool asyncpg LISTEN connection. Undersize it and rolling deploys hit `FATAL: too many connections`. -**NOTIFY semantics.** `broker.publish` / `publish_batch` emit `SELECT pg_notify('outbox_', queue)` on the caller's session right after the INSERT, **except** when future-dated or `timer_id` conflict no-op'd the insert. NOTIFY is transactional — atomicity with the row is automatic; rolled-back transactions silently drop it. The future-dated decision is one shared `_is_future_dated(activate_in, activate_at, now)`; `activate_at`'s comparison and the `publish_batch` `next_attempt_at` are **worker-clock-relative** (unlike `activate_in`'s server-side `make_interval`), so under worker/DB clock skew NOTIFY may fire slightly early/late — polling backstops it (F2-04/F2-05). NOTIFYs emitted **during a fetch-loop reconnect/backoff window are lost** (LISTEN is not durable); latency degrades to the poll interval until the next tick — a latency, not a correctness, gap (F1-07). Channel naming is `outbox_`. Postgres limits identifiers to 63 bytes; `make_outbox_table` **raises `ValueError`** when the longest derived identifier — an index name like `
_pending_idx`, longer than the NOTIFY channel itself — would exceed it — so over-long table names (~>51 bytes) are rejected at construction, not silently degraded to polling. +**NOTIFY semantics.** `broker.publish` / `publish_batch` emit `SELECT pg_notify('outbox_
', queue)` on the caller's session right after the INSERT, **except** when future-dated or `timer_id` conflict no-op'd the insert. NOTIFY is transactional — atomicity with the row is automatic; rolled-back transactions silently drop it. The future-dated decision is one shared `is_future_dated(activate_in, activate_at, now)` (in the stdlib-only leaf `_scheduling.py`, alongside `resolve_next_attempt_client_side` and `validate_activate_args` — the pure activate-args helpers the real and fake publish paths share); `activate_at`'s comparison and the `publish_batch` `next_attempt_at` are **worker-clock-relative** (unlike `activate_in`'s server-side `make_interval`), so under worker/DB clock skew NOTIFY may fire slightly early/late — polling backstops it (F2-04/F2-05). NOTIFYs emitted **during a fetch-loop reconnect/backoff window are lost** (LISTEN is not durable); latency degrades to the poll interval until the next tick — a latency, not a correctness, gap (F1-07). Channel naming is `outbox_`. Postgres limits identifiers to 63 bytes; `make_outbox_table` **raises `ValueError`** when the longest derived identifier — an index name like `
_pending_idx`, longer than the NOTIFY channel itself — would exceed it — so over-long table names (~>51 bytes) are rejected at construction, not silently degraded to polling. ### Lease-token invariant — load-bearing @@ -146,6 +148,8 @@ Deep dive: `architecture/drain.md`. `FakeOutboxClient.validate_schema()` raises `NotImplementedError` — a silent pass would let users ship broken schemas while tests stay green. Use a real `OutboxClient(real_engine, table)` for schema validation tests. +**Client contract.** `OutboxClient` (SQL) and `FakeOutboxClient` (Python) implement the same rules in different substrates — they can't share code, so `tests/test_client_contract.py` pins them to one behavioural contract: a single parametrized scenario module run against **both** adapters (fake everywhere; real Postgres auto-skipped when unreachable) over the shared `AbstractOutboxClient` surface (`fetch` / `delete_with_lease` / `mark_pending_with_lease` + DLQ). Drift fails a test instead of shipping. It pins *structural* drift only — an in-process test can't manufacture cross-host DB-vs-worker clock skew, so the real client's server-side `make_interval` clock authority stays a documented invariant, not an assertion. `cancel_timer` and `timer_id` insert-dedup are broker/producer concerns, not on the client interface, so they live in `test_integration.py` / `test_fake.py`. + **Session leniency.** The fake `publish` / `publish_batch` / `cancel_timer` / `fetch_unprocessed` all `del session` — any value (incl. `None`) is accepted, diverging from production's `isinstance(session, AsyncSession)` `TypeError` (F4-09). Tests that need to assert the session contract must use the real `OutboxClient` / a real `AsyncSession`, not the fake. `OutboxResponse` is **not** faked, so its eager session/queue/activate validation does fire under the test broker. **Gotcha:** subscribers registered via `OutboxRouter` (then `broker.include_router(router)`) live on the router, not `broker._subscribers`. Walk `broker.subscribers` (the property) for full introspection. diff --git a/architecture/dlq.md b/architecture/dlq.md index 034e2e4..1810d51 100644 --- a/architecture/dlq.md +++ b/architecture/dlq.md @@ -10,6 +10,8 @@ User-facing: `docs/usage/dlq.md`. Invariant summary: `CLAUDE.md` § Opt-in DLQ. `OutboxClient.delete_with_lease` switches to `WITH deleted AS (DELETE … RETURNING …) INSERT INTO SELECT … FROM deleted` when configured. One statement preserves the writer-connection autocommit fast path and the lease-token guard; INSERT failure rolls back the DELETE, so the outbox row stays leased and is reclaimed when the lease expires — **DLQ misconfiguration surfaces as outbox-table growth + `lease_lost` spikes rather than silent audit loss.** Identifiers are quoted via the dialect's `identifier_preparer`; values flow through bind params. +The CTE's `RETURNING` / `INSERT` / `SELECT` column lists are not hand-written — they derive from `_DLQ_PROJECTION` (the `(outbox_col, dlq_col)` pairs copied verbatim) plus `_DLQ_INJECTED_COLUMNS` (`failure_reason`, `last_exception`, supplied by the caller) in `schema.py`. The fake (`FakeOutboxClient.delete_with_lease`) builds its audit dict from the same constants, so the two substrates can't drift on which columns the archive carries — a DLQ column change is one edit in `schema.py`, verified across both adapters by `tests/test_client_contract.py`. `failed_at` is not in the projection; it rides the DLQ column's `server_default`. + ## `terminal_failure_reason` routing `OutboxInnerMessage.terminal_failure_reason` is set on the three failure paths: diff --git a/architecture/test-broker.md b/architecture/test-broker.md index 9a6dcba..5ea524d 100644 --- a/architecture/test-broker.md +++ b/architecture/test-broker.md @@ -28,6 +28,12 @@ Spins up the real `_fetch_loop` / `_worker_loop` against the fake client. Requir `FakeOutboxClient.validate_schema()` raises `NotImplementedError` — there is no real DB to validate against, and a silent pass would let users ship broken schemas while their `TestOutboxBroker`-backed tests stay green. Tests that need real schema validation must construct an `OutboxClient(real_engine, table)` against the same DSN the migrations ran against. +## The client contract keeps the fake honest + +`FakeOutboxClient` re-implements the outbox rules in Python because there is no in-process Postgres — eligibility, lease cutoff, retry timing, the NULL-token guard, and the DLQ projection all exist twice (SQL in `OutboxClient`, Python here). The two can't share an implementation (one runs in the database, one in the process), so `tests/test_client_contract.py` couples them by **behaviour** instead: one parametrized scenario module asserts the shared `AbstractOutboxClient` surface (`fetch` / `delete_with_lease` / `mark_pending_with_lease` + DLQ) against *both* adapters — the fake everywhere, real Postgres auto-skipped when unreachable. A per-adapter harness hides substrate differences (how a row is seeded, which connection a terminal write needs); scheduling is seeded as server-side `make_interval` offsets so the comparison is clock-skew-free; expectations are hand-specified so neither adapter passes trivially against itself. + +What it pins is *structural* drift (eligibility states, FIFO selection under contention, the token guard, the DLQ projection). What it deliberately cannot pin: cross-host DB-vs-worker clock skew — an in-process test can't manufacture it — so the real client's server-side clock authority stays a documented invariant. `cancel_timer` and `timer_id` insert-dedup are broker/producer concerns (not on the client interface), so they remain covered in `test_integration.py` / `test_fake.py`, not the contract suite. The pure helpers that *can* be shared — the DLQ projection (`_DLQ_PROJECTION` in `schema.py`) and the activate-args resolution (`_scheduling.py`) — are extracted so the fake consumes the same code as production rather than a parallel copy. + ## `_fake_start` skips the parent publisher-iteration loop `TestOutboxBroker._fake_start` deliberately **skips the parent's publisher-iteration loop** (the one that calls `create_publisher_fake_subscriber`). Reason: FastStream's publisher-spy infrastructure mocks the registered handler to forward `publisher.publish()` calls — which conflicts with the outbox's real dispatch path (the fake producer already lands rows in the fake client *and* drives the real handler via `_sync_dispatch`). diff --git a/planning/changes/2026-06-23.01-client-rules-kernel/design.md b/planning/changes/2026-06-23.01-client-rules-kernel/design.md new file mode 100644 index 0000000..6aa6ab2 --- /dev/null +++ b/planning/changes/2026-06-23.01-client-rules-kernel/design.md @@ -0,0 +1,197 @@ +--- +status: shipped +date: 2026-06-23 +slug: client-rules-kernel +summary: Deduplicate the outbox rules between the real and fake clients — extract the genuinely-pure bits (DLQ projection, scheduling resolution) and co-verify the irreducibly-SQL bits with one contract suite run against both adapters. +supersedes: null +superseded_by: null +pr: null +outcome: | + Landed as planned. Pure bits extracted: `_scheduling.py` (activate-args resolution + + validation, shared by real and fake publish paths) and `_DLQ_PROJECTION` in `schema.py` + (single source for the outbox→DLQ column mapping). Irreducibly-SQL rules co-verified by + `tests/test_client_contract.py` — one parametrized module over both adapters (fake + everywhere, real Postgres auto-skipped). Scope correction during execution: `cancel_timer` + and `timer_id` insert-dedup are broker/producer concerns, not on `AbstractOutboxClient`, so + they were excluded from the contract suite; within-batch fetch *return order* is unspecified + (F2-09), so the suite asserts FIFO *selection* under LIMIT, not return order. Replace-don't-layer + removed ~220 lines of subsumed per-adapter tests; full suite 543 passed at 100% coverage. +--- + +# Design: Dedupe the outbox rules between the real and fake clients + +## Summary + +`AbstractOutboxClient` already has two real adapters: `OutboxClient` (SQL/Postgres) +and `FakeOutboxClient` (in-memory). The transactional-outbox *rules* — row +eligibility, lease expiry, retry timing, scheduling resolution, the DLQ field +projection — are written down **twice**: once as SQLAlchemy expressions in +`client.py`/`producer.py`, once as Python over `_FakeRow` in `testing.py`. The two +copies are kept in parity by hand and by comment (`P9`, `B10`). This change removes +the duplication where the implementations *can* be shared, and replaces hand-parity +with a machine-checked contract where they cannot. + +It does **not** try to put all the rules behind "one pure module both adapters +call" — that is impossible for the rules the real adapter delegates to Postgres +(see Non-goals). Instead it splits the rules into two groups and treats each on its +own terms: **share the pure ones, co-verify the SQL ones.** + +## Motivation + +The architecture review (HTML report, 2026-06-23) flagged `FakeOutboxClient` as a +"second source of truth." Concretely: + +- **DLQ projection drift.** `_build_dlq_cte_stmt` (`client.py:335–348`) hand-writes + the outbox→DLQ column list; `FakeOutboxClient.delete_with_lease` + (`testing.py:183–196`) hand-writes the same projection as a dict literal. The + `# P9 parity with the real DLQ CTE` comment (`testing.py:194`) is parity + maintained by eyeball. Add a DLQ column and you must edit both, in two languages, + or silently drop an audit field. +- **Scheduling resolution scattered + duplicated.** `_is_future_dated` + (`producer.py:35`) and `_compute_next_at_client_side` (`broker.py:73`) are pure, + but live in two files; the real `publish_batch` *inlines* the same client-side + `next_attempt_at` computation at `producer.py:182` instead of calling the helper. +- **No coupling between the two clients' tests.** The fake is tested in + `test_fake.py`, the real in `test_integration.py`. Nothing asserts they agree — + drift surfaces only if a human duplicates each scenario in both suites. The + clock-handling already differs (real uses DB `now()` via `make_interval`; fake + uses worker-clock `utcnow()`), so a green fake test can mask a real-path bug. + +## Non-goals + +- **A single pure "rules kernel" both adapters call.** The real adapter runs + eligibility (`SELECT … FOR UPDATE SKIP LOCKED`), the lease cutoff, and retry + `next_attempt_at` *inside Postgres* — for atomicity, partial-index inference, and + DB-clock authority. A pure `is_eligible(...)` would have exactly one runtime + consumer (the fake): a one-adapter hypothetical seam, i.e. pure indirection. These + rules are deliberately left as two implementations and co-verified instead. +- **Reproducing cross-host clock skew in tests.** An in-process contract test cannot + manufacture DB-vs-worker clock skew. The suite pins *structural* drift; the + clock-authority subtlety stays a documented invariant. +- **Changing any runtime behaviour.** This is a refactor + test addition. With + `dlq_table=None` and no scheduling args, every code path stays bit-for-bit + identical. + +## Design + +The rules split into two groups, handled differently. + +### 1. Shared: the DLQ projection (`schema.py`) + +Introduce one declarative projection next to `make_dlq_table`, where the columns are +already defined — locality: a schema change and its projection change become one +edit in one file. + +```python +# schema.py — ordered (outbox_col, dlq_col) pairs copied verbatim on archive. +_DLQ_PROJECTION: tuple[tuple[str, str], ...] = ( + ("id", "original_id"), + ("queue", "queue"), + ("payload", "payload"), + ("headers", "headers"), + ("deliveries_count", "deliveries_count"), + ("created_at", "created_at"), + ("timer_id", "timer_id"), +) +# Injected (not copied from the outbox row): failure_reason, last_exception. +``` + +- `OutboxClient._build_dlq_cte_stmt` builds its `RETURNING` / `INSERT` / `SELECT` + column lists from `_DLQ_PROJECTION` instead of literal SQL text. +- `FakeOutboxClient.delete_with_lease` builds its DLQ dict from `_DLQ_PROJECTION` + instead of a literal dict. + +The seam is the projection map; the two adapters (SQL CTE, dict) are its consumers. +The `P9`/`B10` hand-parity comments are deleted — parity is now structural. + +### 2. Shared: scheduling resolution + activate-args validation (`_scheduling.py`) + +New private leaf module (matching the `_time.py` / `_import_checker.py` convention), +depending only on `_time`: + +```python +# _scheduling.py +def is_future_dated(activate_in, activate_at, now) -> bool: ... +def resolve_next_attempt_client_side(activate_in, activate_at, now) -> datetime | None: ... +def validate_activate_args(method_name, activate_in, activate_at) -> None: ... +``` + +- `is_future_dated` and `resolve_next_attempt_client_side` move out of `producer.py` + / `broker.py`. The real `publish_batch` (`producer.py:175–201`) calls + `resolve_next_attempt_client_side` instead of inlining it. The fake producer and + fake client import from `_scheduling`, not `broker`. +- `validate_activate_args` (the mutex + tz subset used by the fakes, currently + `broker.py:59`) moves here too — colocating "everything about resolving and + validating activate-args" under one home. `broker.py` shrinks. +- The single-publish real path keeps its server-side `now() + make_interval(...)` + for `activate_in` (clock-skew safety) — that is not pure and stays in `producer.py`. + +### 3. Co-verified: one client contract suite (`tests/test_client_contract.py`) + +The irreducibly-SQL rules (eligibility, lease cutoff, retry timing, the NULL-token +guard) stay as two implementations but are pinned by **one parametrized scenario +module** run against both adapters. + +- **Parametrization:** `client ∈ {fake, real}`. The fake param runs everywhere; the + real param uses the existing `pg_engine` fixture and **auto-skips when Postgres is + unreachable** (same gate `test_integration.py` uses today). +- **Per-adapter harness fixture** (test-only) bridges the surfaces uniformly: + - `seed_row(**fields)` — fake: `.feed(...)`; real: raw `INSERT` on the table. + - `open_conn()` — fake: yields `None`; real: `engine.connect()`. + - exposes the `client` under test. + + Each scenario reads `harness.seed_row(...)`, `await client.fetch(harness.conn, …)`, + asserts on the observable result — adapter-agnostic. Expectations are + hand-specified (not computed from a shared function), so neither adapter passes + trivially against itself. + +- **Contract the suite pins:** + - `fetch`: claims unleased; skips future-dated (`next_attempt_at > now`); reclaims + expired lease; skips fresh lease; FIFO order `(next_attempt_at, id)`; respects + `limit`; filters by queue set. + - claim side-effects: `acquired_token` / `acquired_at` set, `deliveries_count` + incremented. + - `delete_with_lease`: deletes iff token matches; no-op on mismatch / NULL token; + DLQ row materialized via `_DLQ_PROJECTION` when DLQ configured. + - `mark_pending_with_lease`: reschedules iff token matches; clears lease; sets + attempts / timestamps. + - `cancel_timer`: drops an unleased timer row; refuses a leased one. + - `timer_id` dedup: re-insert of the same `(queue, timer_id)` is a no-op. + +**Replace, don't layer:** scenarios now covered by the contract suite are removed +from `test_fake.py` / `test_integration.py` so the contract is asserted once at the +seam, not echoed per-adapter. Suite-specific behaviour (sync-vs-loop test broker, +real-only schema validation, drain) stays where it is. + +### 4. Documentation (ship-time, in the implementing PR) + +No `CONTEXT.md` — this project records domain/architecture vocabulary in `CLAUDE.md` ++ `architecture/*.md`. At ship time: + +- "DLQ projection" → **User-owned schema** + **DLQ** sections of `CLAUDE.md`, and + `architecture/dlq.md`. +- "client contract" → **Test broker** section of `CLAUDE.md`, and + `architecture/test-broker.md` (it is the thing that keeps the fake honest). + +## Testing + +- New `tests/test_client_contract.py` (parametrized fake + real; real auto-skips + without Postgres) — the primary deliverable. +- Existing `test_fake.py` / `test_integration.py` scenarios that the contract suite + now owns are deleted (replace, don't layer); the rest stay green. +- `just test` (full suite, Postgres 17) and `just lint` clean. Coverage stays at + `--cov-fail-under=100` — the refactor removes code paths, it does not add untested + ones. + +## Risk + +- **DLQ projection refactor changes the CTE SQL text.** Likelihood low, impact high + (terminal failures route through it). Mitigation: the contract suite asserts the + materialized DLQ row field-by-field against both adapters; the real path is + exercised under Postgres in CI. +- **Contract suite hides drift it claims to catch (clock authority).** Documented + explicitly as a non-goal; the residual invariant lives in `CLAUDE.md`. The suite + catches structural drift, which is the actual maintenance hazard. +- **Import cycles from the new `_scheduling.py`.** Mitigation: it is a leaf + depending only on `_time`; `producer`, `broker`, and `testing` import *from* it, + never the reverse. diff --git a/planning/changes/2026-06-23.01-client-rules-kernel/plan.md b/planning/changes/2026-06-23.01-client-rules-kernel/plan.md new file mode 100644 index 0000000..5570a53 --- /dev/null +++ b/planning/changes/2026-06-23.01-client-rules-kernel/plan.md @@ -0,0 +1,218 @@ +--- +status: shipped +date: 2026-06-23 +slug: client-rules-kernel +spec: client-rules-kernel +pr: null +--- + +# client-rules-kernel — implementation plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use +> superpowers:subagent-driven-development (recommended) or +> superpowers:executing-plans to implement this plan task-by-task. Steps +> use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Remove the hand-maintained rule duplication between `OutboxClient` and +`FakeOutboxClient` — share the pure bits, co-verify the SQL bits with one contract +suite — without changing any runtime behaviour. + +**Spec:** [`design.md`](./design.md) + +**Branch:** `refactor/client-rules-kernel` + +**Commit strategy:** Per-task commits. + +--- + +### Task 1: Extract `_scheduling.py` + +**Files:** +- Create: `faststream_outbox/_scheduling.py` +- Modify: `faststream_outbox/publisher/producer.py`, `faststream_outbox/broker.py`, `faststream_outbox/testing.py` + +Pull the three pure activate-args helpers into one leaf module; kill the inlined +copy in `publish_batch`. Pure move — no behaviour change. (Spec §2.) + +- [ ] **Step 1: Create the leaf module** + + New `faststream_outbox/_scheduling.py` depending only on `_time`, with: + `is_future_dated(activate_in, activate_at, now)`, + `resolve_next_attempt_client_side(activate_in, activate_at, now)`, + `validate_activate_args(method_name, activate_in, activate_at)`. + Bodies move verbatim from `producer.py:35` (`_is_future_dated`), `broker.py:73` + (`_compute_next_at_client_side`), `broker.py:59` (`_validate_activate_args`). + +- [ ] **Step 2: Rewire consumers** + + - `producer.py`: import from `_scheduling`; delete local `_is_future_dated`; in + `publish_batch` (`producer.py:175–201`) call `resolve_next_attempt_client_side` + instead of inlining `now + cmd.activate_in` / `cmd.activate_at`. + - `broker.py`: delete local `_compute_next_at_client_side` and + `_validate_activate_args`; update any in-module callers to import from `_scheduling`. + - `testing.py`: change the imports at `testing.py:27–28` to source from `_scheduling`. + +- [ ] **Step 3: Verify** + + `uv run pytest tests/test_unit.py tests/test_fake.py --no-cov -q` green. + `grep -rn "_compute_next_at_client_side\|_is_future_dated\|_validate_activate_args" faststream_outbox/` + shows definitions only in `_scheduling.py`. + +- [ ] **Step 4: Commit** + + ```bash + git add faststream_outbox/_scheduling.py faststream_outbox/publisher/producer.py faststream_outbox/broker.py faststream_outbox/testing.py + git commit -m "refactor(scheduling): consolidate activate-args helpers into _scheduling.py + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` + +--- + +### Task 2: Write the client contract suite (green against current code) + +**Files:** +- Create: `tests/test_client_contract.py` +- Modify: `tests/conftest.py` (per-adapter harness fixture) + +Establish the cross-adapter safety net *before* refactoring the DLQ projection, so +Task 3 is guarded. The suite must pass against the current code. (Spec §3.) + +- [ ] **Step 1: Add the harness fixture** + + In `conftest.py`, a fixture parametrized `client ∈ {fake, real}` yielding a harness + exposing `client`, `seed_row(**fields)` (fake: `.feed(...)`; real: raw `INSERT`), + and `open_conn()` (fake: `None`; real: `engine.connect()`). The `real` param uses + the existing `pg_engine` fixture and **skips when Postgres is unreachable** (mirror + `test_integration.py`'s skip gate). + +- [ ] **Step 2: Write the contract scenarios** + + In `test_client_contract.py`, adapter-agnostic scenarios covering the contract in + spec §3: `fetch` (unleased / future-dated / expired-lease / fresh-lease / FIFO / + limit / queue filter), claim side-effects, `delete_with_lease` (token match / + mismatch / NULL token / DLQ materialization), `mark_pending_with_lease`, + `cancel_timer`, `timer_id` dedup. Expectations hand-specified, not computed from a + shared function. + +- [ ] **Step 3: Verify both adapters** + + `uv run pytest tests/test_client_contract.py --no-cov -q` green for the fake param. + `just test tests/test_client_contract.py` green for both params (Postgres up). + +- [ ] **Step 4: Commit** + + ```bash + git add tests/test_client_contract.py tests/conftest.py + git commit -m "test(client): add contract suite run against both fake and real adapters + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` + +--- + +### Task 3: Extract `_DLQ_PROJECTION` (guarded by Task 2) + +**Files:** +- Modify: `faststream_outbox/schema.py`, `faststream_outbox/client.py`, `faststream_outbox/testing.py` + +Replace the hand-kept outbox→DLQ parity with one declarative projection. (Spec §1.) + +- [ ] **Step 1: Declare the projection** + + In `schema.py`, next to `make_dlq_table`, add `_DLQ_PROJECTION` — ordered + `(outbox_col, dlq_col)` pairs (`id→original_id`, `queue→queue`, `payload`, + `headers`, `deliveries_count`, `created_at`, `timer_id`). Note injected fields + (`failure_reason`, `last_exception`) in a comment. + +- [ ] **Step 2: Consume it in both adapters** + + - `client.py`: `_build_dlq_cte_stmt` (`:335–348`) builds its `RETURNING` / + `INSERT (...)` / `SELECT` column lists from `_DLQ_PROJECTION` (preserve the + identifier-quoting via the dialect preparer). Delete the `B10` literal-list risk. + - `testing.py`: `delete_with_lease` (`:183–196`) builds the DLQ dict from + `_DLQ_PROJECTION`. Delete the `# P9 parity` comment — parity is now structural. + +- [ ] **Step 3: Verify no drift** + + `just test tests/test_client_contract.py` still green for both params (the suite's + DLQ-materialization assertions now guard the refactor). Full `just test` green. + +- [ ] **Step 4: Commit** + + ```bash + git add faststream_outbox/schema.py faststream_outbox/client.py faststream_outbox/testing.py + git commit -m "refactor(dlq): derive outbox->DLQ projection from one declarative map + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` + +--- + +### Task 4: Replace, don't layer — prune redundant tests + +**Files:** +- Modify: `tests/test_fake.py`, `tests/test_integration.py` + +Delete per-adapter scenarios the contract suite now owns; keep suite-specific tests +(sync-vs-loop test broker, real-only schema validation, drain). (Spec §3.) + +- [ ] **Step 1: Identify overlap** + + Find tests in `test_fake.py` / `test_integration.py` whose assertions are now + subsumed by `test_client_contract.py` (eligibility, lease reclaim, token guard, + DLQ projection, timer dedup, cancel_timer). + +- [ ] **Step 2: Delete the overlap; keep the rest** + + Remove the subsumed cases. Leave anything the contract suite cannot express. + +- [ ] **Step 3: Verify coverage holds** + + `just test` green with `--cov-fail-under=100`. If coverage drops, a deleted test + covered a path the contract suite misses — restore it or extend the suite. + +- [ ] **Step 4: Commit** + + ```bash + git add tests/test_fake.py tests/test_integration.py + git commit -m "test: drop per-adapter cases now owned by the client contract suite + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` + +--- + +### Task 5: Promote docs (ship-time, in this PR) + +**Files:** +- Modify: `CLAUDE.md`, `architecture/dlq.md`, `architecture/test-broker.md` +- Modify: `planning/changes/2026-06-23.01-client-rules-kernel/design.md` (frontmatter) + +Record the new vocabulary where this project keeps it; close out the change. (Spec §4.) + +- [ ] **Step 1: CLAUDE.md + architecture/** + + - "DLQ projection" → **User-owned schema** + **DLQ** sections of `CLAUDE.md`; + `architecture/dlq.md`. + - "client contract" → **Test broker** section of `CLAUDE.md`; + `architecture/test-broker.md`. Note the documented clock-skew residual (the + contract pins structural drift, not cross-host clock authority). + +- [ ] **Step 2: Close out the change** + + Set `status: shipped`, fill `pr:` and `outcome:` in `design.md` (and `plan.md`). + `just index` to regenerate the listing. + +- [ ] **Step 3: Final verification** + + `just lint` and `just test` clean. + +- [ ] **Step 4: Commit** + + ```bash + git add CLAUDE.md architecture/ planning/ + git commit -m "docs: record DLQ projection + client contract; close out change + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` From f811a4c171a68f03561db6cc0eeed325dc6ec333 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 17:04:03 +0300 Subject: [PATCH 6/6] docs(planning): record PR #109 on client-rules-kernel change Co-Authored-By: Claude Opus 4.8 (1M context) --- planning/changes/2026-06-23.01-client-rules-kernel/design.md | 2 +- planning/changes/2026-06-23.01-client-rules-kernel/plan.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/planning/changes/2026-06-23.01-client-rules-kernel/design.md b/planning/changes/2026-06-23.01-client-rules-kernel/design.md index 6aa6ab2..575a54f 100644 --- a/planning/changes/2026-06-23.01-client-rules-kernel/design.md +++ b/planning/changes/2026-06-23.01-client-rules-kernel/design.md @@ -5,7 +5,7 @@ slug: client-rules-kernel summary: Deduplicate the outbox rules between the real and fake clients — extract the genuinely-pure bits (DLQ projection, scheduling resolution) and co-verify the irreducibly-SQL bits with one contract suite run against both adapters. supersedes: null superseded_by: null -pr: null +pr: 109 outcome: | Landed as planned. Pure bits extracted: `_scheduling.py` (activate-args resolution + validation, shared by real and fake publish paths) and `_DLQ_PROJECTION` in `schema.py` diff --git a/planning/changes/2026-06-23.01-client-rules-kernel/plan.md b/planning/changes/2026-06-23.01-client-rules-kernel/plan.md index 5570a53..25f8787 100644 --- a/planning/changes/2026-06-23.01-client-rules-kernel/plan.md +++ b/planning/changes/2026-06-23.01-client-rules-kernel/plan.md @@ -3,7 +3,7 @@ status: shipped date: 2026-06-23 slug: client-rules-kernel spec: client-rules-kernel -pr: null +pr: 109 --- # client-rules-kernel — implementation plan