From 94684417b17e03d44c77072f1ac6d1bed8ba7a6c Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 17:28:46 +0300 Subject: [PATCH 1/2] refactor(subscriber): give lease-lost telemetry one home MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _flush_terminal and _flush_retry carried a byte-identical detect->log->emit tail differing only in phase. Extract _emit_lease_lost(row, *, phase); each flush method keeps its explicit rowcount-0 branch. Minimal (A) form — the Lease value object (B) and Lease module (C) were rejected (lease guard is intrinsically SQL). Co-Authored-By: Claude Opus 4.8 (1M context) --- faststream_outbox/subscriber/usecase.py | 72 +++++++++---------- .../change.md | 72 +++++++++++++++++++ 2 files changed, 104 insertions(+), 40 deletions(-) create mode 100644 planning/changes/2026-06-23.02-consolidate-lease-lost/change.md diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 8898a64..d7e8e15 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -718,6 +718,36 @@ async def _safe_flush( return False return await flush(row, writer_conn=writer_conn) + def _emit_lease_lost(self, row: OutboxInnerMessage, *, phase: str) -> None: + """ + Log + record the ``lease_lost`` event shared by the terminal and retry flush paths. + + A terminal/retry write that finds ``rowcount == 0`` means a newer fetch reclaimed + the row (its lease expired mid-handler) — the row will be redelivered, so the + caller must NOT also emit an acked/nacked metric (P17). Both flush paths report + this identically apart from ``phase`` (``"terminal"`` | ``"retry"``). + """ + self._log( + log_level=logging.WARNING, + message=f"Outbox row {row} lease expired before {phase} write; skipping", + extra={ + "event": "lease_lost", + "phase": phase, + "row_id": row.id, + "queue": row.queue, + "deliveries_count": row.deliveries_count, + }, + ) + self._emit_metric( + "lease_lost", + { + **self._base_tags(row.queue), + "phase": phase, + "row_id": row.id, + "deliveries_count": row.deliveries_count, + }, + ) + async def _flush_terminal( self, row: OutboxInnerMessage, @@ -746,26 +776,7 @@ async def _flush_terminal( dlq_payload=dlq_payload, ) if not deleted: - self._log( - log_level=logging.WARNING, - message=f"Outbox row {row} lease expired before delete; skipping", - extra={ - "event": "lease_lost", - "phase": "terminal", - "row_id": row.id, - "queue": row.queue, - "deliveries_count": row.deliveries_count, - }, - ) - self._emit_metric( - "lease_lost", - { - **self._base_tags(row.queue), - "phase": "terminal", - "row_id": row.id, - "deliveries_count": row.deliveries_count, - }, - ) + self._emit_lease_lost(row, phase="terminal") return False if dlq_payload is not None: # P34: omit exception_type when there's no exception (e.g. max_deliveries) @@ -801,26 +812,7 @@ async def _flush_retry( last_attempt_at=row.last_attempt_at, # ty: ignore[invalid-argument-type] ) if not updated: - self._log( - log_level=logging.WARNING, - message=f"Outbox row {row} lease expired before retry update; skipping", - extra={ - "event": "lease_lost", - "phase": "retry", - "row_id": row.id, - "queue": row.queue, - "deliveries_count": row.deliveries_count, - }, - ) - self._emit_metric( - "lease_lost", - { - **self._base_tags(row.queue), - "phase": "retry", - "row_id": row.id, - "deliveries_count": row.deliveries_count, - }, - ) + self._emit_lease_lost(row, phase="retry") return False return True diff --git a/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md b/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md new file mode 100644 index 0000000..e2b9fed --- /dev/null +++ b/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md @@ -0,0 +1,72 @@ +--- +status: shipped +date: 2026-06-23 +slug: consolidate-lease-lost +summary: Consolidate the duplicated lease-lost detect→log→emit block shared by _flush_terminal and _flush_retry into one _emit_lease_lost helper. +supersedes: null +superseded_by: null +pr: null +outcome: | + Landed as the minimal (A) form: `_emit_lease_lost(row, *, phase)` in + subscriber/usecase.py; both flush methods call it. ~36 duplicated lines removed. + Existing lease-lost unit tests (which invoke the flush methods directly) passed + unchanged as the regression guard; full suite 543 passed at 100% coverage. (B) the + Lease value object and (C) the Lease module were evaluated and rejected — see Approach. +--- + +# Change: Give lease-lost telemetry one home + +**Lane:** lightweight — ~15 LOC net, 1 file, no new file, no public-API change, +existing unit tests cover it. + +## Goal + +`_flush_terminal` and `_flush_retry` (`subscriber/usecase.py`) each carried a +byte-identical ~17-line tail — WARNING log (`extra={event, phase, row_id, queue, +deliveries_count}`) + `_emit_metric("lease_lost", …)` — differing only in `phase` +(`"terminal"`/`"retry"`). A change to lease-lost telemetry meant editing both with +divergence risk. Collapse the duplication into one `_emit_lease_lost(row, *, phase)`. + +## Approach + +Extract the log+metric into `OutboxSubscriber._emit_lease_lost(row, *, phase)`. Each +flush method keeps its own explicit `if not landed: self._emit_lease_lost(row, +phase=…); return False` branch, so the control flow a reader wants to see (the +rowcount-0 → redeliver decision) stays visible. The terminal/retry blocks were +identical apart from `phase`, so the helper captures both; the log prose now derives +from `phase` (no test couples to the wording). + +**Scope deliberately minimal — (B) and (C) rejected.** This is candidate #2 from the +2026-06-23 architecture review ("give the lease a home"). The grand forms were +evaluated and dropped: + +- **(B) a `Lease` value object** `(message_id, token)` threaded through the client + interface — rejected: the pair is passed at only two call sites, and it would ripple + through `AbstractOutboxClient` + both adapters + the just-landed + `tests/test_client_contract.py` for modest gain. +- **(C) a full "Lease module"** owning issue/guard/cutoff — rejected for the same + reason [[client-rules-kernel]] left eligibility/lease/retry-timing as two + implementations: the real client runs the lease guard *in SQL* + (`WHERE acquired_token = :token`), so a pure Lease module would have one Python + consumer (the subscriber) — a hypothetical seam, i.e. indirection. + +The only genuinely duplicated, in-process piece was the lease-lost handling; that is +all this change touches. The rest of the lease lifecycle (issue/guard/expiry in SQL, +the `acquired_token` field, the `_lease_ck` CHECK) is already single-sourced or +intrinsically SQL and stays put. **A future architecture review should not re-suggest +(B)/(C) without new evidence that the SQL-guard constraint has changed.** + +## Files + +- `faststream_outbox/subscriber/usecase.py` — add `_emit_lease_lost`; both flush + methods call it instead of repeating the log+metric. + +## Verification + +- [x] Existing tests pin the contract (invoke `_flush_terminal`/`_flush_retry` + directly): `test_flush_terminal_logs_lease_lost_at_warning_with_structured_fields`, + `..._retry...`, `test_metrics_lease_lost_terminal/retry_emits_recorder_event`, + `test_dispatch_one_lease_lost_emits_only_lease_lost_not_acked` — `uv run pytest + tests/test_unit.py -k lease_lost` green (6 passed). +- [x] `just lint-ci` — clean. +- [x] `just test` — full suite green at 100% coverage (543 passed). From 311d834a9765ebc5fd86a963a9473e6f7c218dcb Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 23 Jun 2026 18:22:13 +0300 Subject: [PATCH 2/2] docs(planning): record PR #110 on consolidate-lease-lost change Co-Authored-By: Claude Opus 4.8 (1M context) --- planning/changes/2026-06-23.02-consolidate-lease-lost/change.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md b/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md index e2b9fed..7f553d1 100644 --- a/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md +++ b/planning/changes/2026-06-23.02-consolidate-lease-lost/change.md @@ -5,7 +5,7 @@ slug: consolidate-lease-lost summary: Consolidate the duplicated lease-lost detect→log→emit block shared by _flush_terminal and _flush_retry into one _emit_lease_lost helper. supersedes: null superseded_by: null -pr: null +pr: 110 outcome: | Landed as the minimal (A) form: `_emit_lease_lost(row, *, phase)` in subscriber/usecase.py; both flush methods call it. ~36 duplicated lines removed.