Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ The caller owns the `AsyncEngine`. `OutboxBrokerConfig.disconnect()` deliberatel

## Conventions

- Python 3.13+. Global imports (no inline imports) per user's global rules.
- Python 3.13+.
- **Never use local/inline imports.** All imports go at the top of the module — no `import` statements inside functions, methods, or `if TYPE_CHECKING` exception aside. This applies to test files too. If a `# noqa: PLC0415` is the only way to keep an import inline, hoist it instead.
- `ruff` is set to `select = ["ALL"]` with a documented ignore list in `pyproject.toml`; many `# noqa: XXX` comments are intentional and align with that list.
- Type checker is `ty`. Use `# ty: ignore[<rule>]` for intentional escapes (matches existing usage in `broker.py`, `registrator.py`).
27 changes: 25 additions & 2 deletions faststream_outbox/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import typing
from collections.abc import Iterable, Sequence
from types import TracebackType

from faststream import BaseMiddleware
from faststream._internal.basic_types import LoggerProto
Expand Down Expand Up @@ -40,6 +41,29 @@
from faststream_outbox.subscriber.usecase import OutboxSubscriber


class _CaptureExceptionMiddleware(BaseMiddleware):
"""
Stash the handler exception on the inner row before AckMiddleware nacks.

FastStream's AcknowledgementMiddleware catches the handler exception in its
own ``__aexit__`` and calls ``message.nack()`` directly — the exception never
propagates back to the worker loop. Without this middleware, ``OutboxInnerMessage._nack``
sees ``last_exception=None`` and retry strategies that branch on exception type
can't work. We sit one step closer to the handler in the middleware stack so
our ``__aexit__`` runs before AckMiddleware's, capturing ``exc_val`` onto the row.
"""

async def __aexit__(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: TracebackType | None = None,
) -> bool | None:
if exc_val is not None and isinstance(self.msg, OutboxInnerMessage):
self.msg.last_exception = exc_val
return False


class OutboxParamsStorage(DefaultLoggerStorage):
_max_msg_id_ln = -1
_max_queue_name = 7
Expand Down Expand Up @@ -98,9 +122,8 @@ def __init__( # noqa: PLR0913
fd_config = FastDependsConfig(use_fastdepends=apply_types)
broker_config = OutboxBrokerConfig(
engine_state=engine_state,
outbox_table=outbox_table,
client=client,
broker_middlewares=middlewares,
broker_middlewares=(_CaptureExceptionMiddleware, *middlewares),
broker_parser=parser,
broker_decoder=decoder,
logger=make_logger_state(
Expand Down
15 changes: 9 additions & 6 deletions faststream_outbox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None:
self._engine = engine
self._table = outbox_table
# Stable advisory-lock key derived from the table name; ``hashtext`` returns int4.
self._advisory_lock_sql = text(
f"SELECT pg_try_advisory_xact_lock(hashtext('faststream_outbox:{outbox_table.name}'))"
# Parameterized to keep the table name out of SQL string interpolation.
self._advisory_lock_sql = text("SELECT pg_try_advisory_xact_lock(hashtext(:lock_key))").bindparams(
lock_key=f"faststream_outbox:{outbox_table.name}"
)

@property
Expand Down Expand Up @@ -113,23 +114,25 @@ async def mark_pending_with_lease( # noqa: PLR0913
message_id: int,
acquired_token: uuid.UUID,
*,
next_attempt_at: _dt.datetime,
delay_seconds: float,
attempts_count: int,
first_attempt_at: _dt.datetime,
last_attempt_at: _dt.datetime,
) -> bool:
"""
Move *message_id* back to ``pending`` for retry, iff it still holds the lease.

Returns True if the row was updated.
``next_attempt_at`` is computed server-side as ``now() + delay_seconds`` so
retry timing uses the DB clock, not the worker's. Returns True if the row was updated.
"""
t = self._table
next_attempt_at_expr = func.now() + func.make_interval(0, 0, 0, 0, 0, 0, bindparam("delay", type_=Float))
stmt = (
update(t)
.where(t.c.id == message_id, t.c.acquired_token == acquired_token)
.values(
state=OutboxState.PENDING.value,
next_attempt_at=next_attempt_at,
next_attempt_at=next_attempt_at_expr,
attempts_count=attempts_count,
first_attempt_at=first_attempt_at,
last_attempt_at=last_attempt_at,
Expand All @@ -138,7 +141,7 @@ async def mark_pending_with_lease( # noqa: PLR0913
)
)
async with self._engine.begin() as conn:
result = await conn.execute(stmt)
result = await conn.execute(stmt, {"delay": max(0.0, delay_seconds)})
return (result.rowcount or 0) > 0

async def release_stuck(self, *, timeout_seconds: float) -> int:
Expand Down
9 changes: 0 additions & 9 deletions faststream_outbox/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,19 @@
client and to subscribers.
"""

import datetime as _dt
import typing
from collections.abc import Callable
from dataclasses import dataclass, field

from faststream._internal.configs import BrokerConfig
from faststream.exceptions import IncorrectState


if typing.TYPE_CHECKING:
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import AsyncEngine

from faststream_outbox.client import OutboxClient


def _utcnow() -> _dt.datetime:
return _dt.datetime.now(tz=_dt.UTC)


class EngineState:
"""
Lazy holder so the broker can be constructed before the engine is wired up.
Expand All @@ -51,9 +44,7 @@ def set_engine(self, engine: "AsyncEngine") -> None:
@dataclass(kw_only=True)
class OutboxBrokerConfig(BrokerConfig):
engine_state: EngineState = field(default_factory=EngineState)
outbox_table: "Table | None" = None
client: "OutboxClient | None" = None
time_source: Callable[[], _dt.datetime] = _utcnow

async def connect(self) -> None:
# Engine and client are wired up by the broker's constructor; nothing to do here.
Expand Down
11 changes: 7 additions & 4 deletions faststream_outbox/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class OutboxInnerMessage:

state_set: bool = field(default=False, init=False)
to_delete: bool = field(default=False, init=False)
# Set by ``_nack`` when the strategy schedules a retry; consumed by the
# subscriber's ``_flush_retry`` to drive ``mark_pending_with_lease``.
pending_delay_seconds: float | None = field(default=None, init=False)

async def ack(self) -> None:
await self._update_state_if_not_set(self._ack)
Expand All @@ -81,8 +84,8 @@ async def _ack(self) -> None:

async def _nack(self) -> None:
self._record_attempt()
next_at = (
self.retry_strategy.get_next_attempt_at(
delay = (
self.retry_strategy.get_next_attempt_delay(
first_attempt_at=self.first_attempt_at or self.last_attempt_at, # ty: ignore[invalid-argument-type]
last_attempt_at=self.last_attempt_at, # ty: ignore[invalid-argument-type]
attempts_count=self.attempts_count,
Expand All @@ -91,10 +94,10 @@ async def _nack(self) -> None:
if self.retry_strategy is not None
else None
)
if next_at is None:
if delay is None:
self.to_delete = True
else:
self.next_attempt_at = next_at
self.pending_delay_seconds = delay

async def _reject(self) -> None:
self._record_attempt()
Expand Down
37 changes: 19 additions & 18 deletions faststream_outbox/retry.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
import random
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from datetime import datetime
from typing import Protocol


class RetryStrategyProto(Protocol):
"""
Decides whether a Nack'ed row gets another attempt and when.

Implementations return ``None`` to signal terminal failure (the row will be
deleted). The current ``exception`` (if any) is passed through so users can
subclass to retry only on transient errors.
Decides whether a Nack'ed row gets another attempt and how long to wait.

Implementations return the delay in seconds before the next attempt, or
``None`` to signal terminal failure (the row will be deleted). The DB
computes the actual ``next_attempt_at`` from this delay using its own clock,
so retry timing is immune to skew between worker and DB hosts. The current
``exception`` (if any) is passed through so users can subclass to retry only
on transient errors.
"""

def get_next_attempt_at(
def get_next_attempt_delay(
self,
*,
first_attempt_at: datetime,
last_attempt_at: datetime,
attempts_count: int,
exception: BaseException | None = None,
) -> datetime | None: ...
) -> float | None: ...


@dataclass(kw_only=True)
Expand All @@ -32,31 +35,29 @@ class _RetryStrategyTemplate(ABC, RetryStrategyProto):
@abstractmethod
def _delay_seconds(self, *, attempts_count: int) -> float: ...

def get_next_attempt_at(
def get_next_attempt_delay(
self,
*,
first_attempt_at: datetime,
last_attempt_at: datetime,
attempts_count: int,
exception: BaseException | None = None, # noqa: ARG002
) -> datetime | None:
) -> float | None:
if self.max_attempts is not None and attempts_count >= self.max_attempts:
return None
delay = self._delay_seconds(attempts_count=attempts_count)
next_attempt_at = last_attempt_at + timedelta(seconds=delay)
if (
self.max_total_delay_seconds is not None
and (next_attempt_at - first_attempt_at).total_seconds() > self.max_total_delay_seconds
):
return None
return next_attempt_at
if self.max_total_delay_seconds is not None:
elapsed_so_far = (last_attempt_at - first_attempt_at).total_seconds()
if elapsed_so_far + delay > self.max_total_delay_seconds:
return None
return delay


@dataclass(kw_only=True)
class NoRetry(RetryStrategyProto):
"""No retry — first nack is terminal."""

def get_next_attempt_at(
def get_next_attempt_delay(
self,
*,
first_attempt_at: datetime, # noqa: ARG002
Expand Down
11 changes: 8 additions & 3 deletions faststream_outbox/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ def __init__( # noqa: PLR0913


class OutboxRouter(OutboxRegistrator, BrokerRouter[OutboxInnerMessage, BrokerConfig]):
"""Includable router for ``OutboxBroker``."""
"""
Includable router for ``OutboxBroker``.

Use it to register subscribers in a separate module and attach them to the
broker via ``broker.include_router(router)``. There is no ``prefix`` knob:
queues are routed by their literal name, so producers and consumers must
agree on the exact string. If you want namespacing, put it in the queue name.
"""

def __init__( # noqa: PLR0913
self,
prefix: str = "",
handlers: Iterable[OutboxRoute] = (),
*,
dependencies: Iterable["Dependant"] = (),
Expand All @@ -83,7 +89,6 @@ def __init__( # noqa: PLR0913
broker_parser=parser,
broker_decoder=decoder,
include_in_schema=include_in_schema,
prefix=prefix,
),
handlers=handlers, # ty: ignore[unknown-argument]
routers=routers,
Expand Down
11 changes: 0 additions & 11 deletions faststream_outbox/subscriber/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import datetime as _dt
import typing
from collections.abc import Callable
from dataclasses import dataclass

from faststream._internal.configs import SubscriberSpecificationConfig, SubscriberUsecaseConfig
Expand All @@ -26,15 +24,6 @@ class OutboxSubscriberConfig(SubscriberUsecaseConfig):
release_stuck_interval: float
max_deliveries: int | None

@property
def full_queues(self) -> list[str]:
prefix = self._outer_config.prefix or ""
return [f"{prefix}{q}" for q in self.queues]

@property
def time_source(self) -> Callable[[], _dt.datetime]:
return self._outer_config.time_source

@property
def ack_policy(self) -> AckPolicy:
if self._ack_policy is EMPTY:
Expand Down
19 changes: 8 additions & 11 deletions faststream_outbox/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@
class OutboxSubscriberSpecification(SubscriberSpecification["OutboxBrokerConfig", OutboxSubscriberSpecificationConfig]):
@property
def name(self) -> str:
prefix = getattr(self._outer_config, "prefix", "")
joined = ",".join(self.config.queues)
return f"{prefix}{joined}:{self.call_name}"
return f"{joined}:{self.call_name}"

def get_schema(self) -> dict[str, SubscriberSpec]:
return {
Expand Down Expand Up @@ -94,7 +93,7 @@ def _client(self) -> "OutboxClient":

@property
def _queues(self) -> list[str]:
return self._config.full_queues
return self._config.queues

@typing.override
async def start(self) -> None:
Expand Down Expand Up @@ -151,13 +150,11 @@ async def _worker_loop(self) -> None:
if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger):
await self._flush_terminal(row)
continue
# FastStream's AckPolicy middleware catches handler exceptions before
# they reach this except, so this branch is defensive.
# AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware
# stashes exc onto row.last_exception before nack runs, so retry strategies
# can branch on exception type.
try:
await self.consume(row)
except BaseException as exc: # pragma: no cover
row.last_exception = exc
raise
finally:
await row.assert_state_set(logger)
await self._flush_result(row)
Expand All @@ -183,12 +180,12 @@ async def _flush_terminal(self, row: OutboxInnerMessage) -> None:
)

async def _flush_retry(self, row: OutboxInnerMessage) -> None:
if row.acquired_token is None:
if row.acquired_token is None or row.pending_delay_seconds is None:
return
updated = await self._client.mark_pending_with_lease(
row.id,
row.acquired_token,
next_attempt_at=row.next_attempt_at,
delay_seconds=row.pending_delay_seconds,
attempts_count=row.attempts_count,
first_attempt_at=row.first_attempt_at, # ty: ignore[invalid-argument-type]
last_attempt_at=row.last_attempt_at, # ty: ignore[invalid-argument-type]
Expand All @@ -210,7 +207,7 @@ async def _release_stuck_loop(self) -> None:
else:
if released:
self._log(
log_level=logging.WARNING,
log_level=logging.INFO,
message=f"release_stuck reset {released} stale rows back to pending",
)
await anyio.sleep(interval)
Expand Down
4 changes: 2 additions & 2 deletions faststream_outbox/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ async def mark_pending_with_lease( # noqa: PLR0913
message_id: int,
acquired_token: uuid.UUID,
*,
next_attempt_at: _dt.datetime,
delay_seconds: float,
attempts_count: int,
first_attempt_at: _dt.datetime,
last_attempt_at: _dt.datetime,
) -> bool:
for row in self._rows:
if row.id == message_id and row.acquired_token == acquired_token:
row.state = OutboxState.PENDING.value
row.next_attempt_at = next_attempt_at
row.next_attempt_at = _utcnow() + _dt.timedelta(seconds=max(0.0, delay_seconds))
row.attempts_count = attempts_count
row.first_attempt_at = first_attempt_at
row.last_attempt_at = last_attempt_at
Expand Down
Loading
Loading