Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 32 additions & 40 deletions faststream_outbox/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,36 @@ async def _safe_flush(
return False
return await flush(row, writer_conn=writer_conn)

def _emit_lease_lost(self, row: OutboxInnerMessage, *, phase: str) -> None:
"""
Log + record the ``lease_lost`` event shared by the terminal and retry flush paths.

A terminal/retry write that finds ``rowcount == 0`` means a newer fetch reclaimed
the row (its lease expired mid-handler) — the row will be redelivered, so the
caller must NOT also emit an acked/nacked metric (P17). Both flush paths report
this identically apart from ``phase`` (``"terminal"`` | ``"retry"``).
"""
self._log(
log_level=logging.WARNING,
message=f"Outbox row {row} lease expired before {phase} write; skipping",
extra={
"event": "lease_lost",
"phase": phase,
"row_id": row.id,
"queue": row.queue,
"deliveries_count": row.deliveries_count,
},
)
self._emit_metric(
"lease_lost",
{
**self._base_tags(row.queue),
"phase": phase,
"row_id": row.id,
"deliveries_count": row.deliveries_count,
},
)

async def _flush_terminal(
self,
row: OutboxInnerMessage,
Expand Down Expand Up @@ -746,26 +776,7 @@ async def _flush_terminal(
dlq_payload=dlq_payload,
)
if not deleted:
self._log(
log_level=logging.WARNING,
message=f"Outbox row {row} lease expired before delete; skipping",
extra={
"event": "lease_lost",
"phase": "terminal",
"row_id": row.id,
"queue": row.queue,
"deliveries_count": row.deliveries_count,
},
)
self._emit_metric(
"lease_lost",
{
**self._base_tags(row.queue),
"phase": "terminal",
"row_id": row.id,
"deliveries_count": row.deliveries_count,
},
)
self._emit_lease_lost(row, phase="terminal")
return False
if dlq_payload is not None:
# P34: omit exception_type when there's no exception (e.g. max_deliveries)
Expand Down Expand Up @@ -801,26 +812,7 @@ async def _flush_retry(
last_attempt_at=row.last_attempt_at, # ty: ignore[invalid-argument-type]
)
if not updated:
self._log(
log_level=logging.WARNING,
message=f"Outbox row {row} lease expired before retry update; skipping",
extra={
"event": "lease_lost",
"phase": "retry",
"row_id": row.id,
"queue": row.queue,
"deliveries_count": row.deliveries_count,
},
)
self._emit_metric(
"lease_lost",
{
**self._base_tags(row.queue),
"phase": "retry",
"row_id": row.id,
"deliveries_count": row.deliveries_count,
},
)
self._emit_lease_lost(row, phase="retry")
return False
return True

Expand Down
72 changes: 72 additions & 0 deletions planning/changes/2026-06-23.02-consolidate-lease-lost/change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
status: shipped
date: 2026-06-23
slug: consolidate-lease-lost
summary: Consolidate the duplicated lease-lost detect→log→emit block shared by _flush_terminal and _flush_retry into one _emit_lease_lost helper.
supersedes: null
superseded_by: null
pr: 110
outcome: |
Landed as the minimal (A) form: `_emit_lease_lost(row, *, phase)` in
subscriber/usecase.py; both flush methods call it. ~36 duplicated lines removed.
Existing lease-lost unit tests (which invoke the flush methods directly) passed
unchanged as the regression guard; full suite 543 passed at 100% coverage. (B) the
Lease value object and (C) the Lease module were evaluated and rejected — see Approach.
---

# Change: Give lease-lost telemetry one home

**Lane:** lightweight — ~15 LOC net, 1 file, no new file, no public-API change,
existing unit tests cover it.

## Goal

`_flush_terminal` and `_flush_retry` (`subscriber/usecase.py`) each carried a
byte-identical ~17-line tail — WARNING log (`extra={event, phase, row_id, queue,
deliveries_count}`) + `_emit_metric("lease_lost", …)` — differing only in `phase`
(`"terminal"`/`"retry"`). A change to lease-lost telemetry meant editing both with
divergence risk. Collapse the duplication into one `_emit_lease_lost(row, *, phase)`.

## Approach

Extract the log+metric into `OutboxSubscriber._emit_lease_lost(row, *, phase)`. Each
flush method keeps its own explicit `if not landed: self._emit_lease_lost(row,
phase=…); return False` branch, so the control flow a reader wants to see (the
rowcount-0 → redeliver decision) stays visible. The terminal/retry blocks were
identical apart from `phase`, so the helper captures both; the log prose now derives
from `phase` (no test couples to the wording).

**Scope deliberately minimal — (B) and (C) rejected.** This is candidate #2 from the
2026-06-23 architecture review ("give the lease a home"). The grand forms were
evaluated and dropped:

- **(B) a `Lease` value object** `(message_id, token)` threaded through the client
interface — rejected: the pair is passed at only two call sites, and it would ripple
through `AbstractOutboxClient` + both adapters + the just-landed
`tests/test_client_contract.py` for modest gain.
- **(C) a full "Lease module"** owning issue/guard/cutoff — rejected for the same
reason [[client-rules-kernel]] left eligibility/lease/retry-timing as two
implementations: the real client runs the lease guard *in SQL*
(`WHERE acquired_token = :token`), so a pure Lease module would have one Python
consumer (the subscriber) — a hypothetical seam, i.e. indirection.

The only genuinely duplicated, in-process piece was the lease-lost handling; that is
all this change touches. The rest of the lease lifecycle (issue/guard/expiry in SQL,
the `acquired_token` field, the `<table>_lease_ck` CHECK) is already single-sourced or
intrinsically SQL and stays put. **A future architecture review should not re-suggest
(B)/(C) without new evidence that the SQL-guard constraint has changed.**

## Files

- `faststream_outbox/subscriber/usecase.py` — add `_emit_lease_lost`; both flush
methods call it instead of repeating the log+metric.

## Verification

- [x] Existing tests pin the contract (invoke `_flush_terminal`/`_flush_retry`
directly): `test_flush_terminal_logs_lease_lost_at_warning_with_structured_fields`,
`..._retry...`, `test_metrics_lease_lost_terminal/retry_emits_recorder_event`,
`test_dispatch_one_lease_lost_emits_only_lease_lost_not_acked` — `uv run pytest
tests/test_unit.py -k lease_lost` green (6 passed).
- [x] `just lint-ci` — clean.
- [x] `just test` — full suite green at 100% coverage (543 passed).