Merged
Conversation
AcknowledgementMiddleware catches handler exceptions in its own __aexit__ and calls message.nack() directly, so the worker loop's defensive `except BaseException` never ran in practice. _nack saw last_exception=None and the documented "retry only on transient errors" subclass pattern (retry.py:13-15) was silently broken. Add a small BaseMiddleware whose __aexit__ stashes exc_val onto the OutboxInnerMessage. It sits at the top of the broker_middlewares tuple so its __aexit__ runs before AcknowledgementMiddleware's in stack-pop order, populating row.last_exception in time for nack to read it. Drop the now-truly-dead `except BaseException` block in _worker_loop. Tests cover both the happy path (exception is captured) and the documented subclass pattern (terminate on ValueError, retry on RuntimeError). Strengthen CLAUDE.md's import rule: never use local/inline imports.
OutboxRouter(prefix="svc-") used to silently rewrite subscriber queue
names (subscriber("orders") → fetches "svc-orders") but broker.publish
inserted the literal queue, so a router-prefixed subscriber would never
see rows produced via broker.publish(queue="orders"). The producer side
can't resolve a prefix coherently — a queue name can belong to multiple
routers — so the only sane resolution is to drop the knob entirely.
After this change, queues are routed by their literal name. If you want
namespacing, put it in the queue string itself.
- Remove `prefix` parameter from OutboxRouter.__init__.
- Remove `OutboxSubscriberConfig.full_queues`; subscriber now reads
`self.queues` directly.
- Drop the prefix lookup in OutboxSubscriberSpecification.name (the spec
name now uses the literal queue list).
- Add a fake-broker regression test that exercises router-registered
subscriber + plain-queue feed end-to-end.
OutboxBrokerConfig.outbox_table was set in the broker constructor but never read — the broker stores its own self._outbox_table for inserts. OutboxBrokerConfig.time_source had a default of _utcnow but was only read by OutboxSubscriberConfig.time_source, a property only ever read by tests; the actual subscriber code paths use module-level _utcnow() from message.py. Both attributes were traps: a user setting time_source expecting it to take effect would see no behavior change. Drop them, drop the subscriber-config passthrough property, and drop the two unit tests that exercised the dead path.
A subscriber with one consistently slow handler hits release_stuck on every interval. Logging that as WARNING produced a steady stream of alert-shaped messages for an entirely benign condition. INFO is the right level — operators who care can rate-alert via their log aggregator. The error path stays at ERROR.
The release_stuck advisory-lock query was built via f-string against outbox_table.name. Blast radius is small (the table name comes from developer config, not user input), but it's a free SQL injection surface that goes away with one bindparam. Lock-key value is unchanged (hashtext over the same string) so existing deployments are unaffected. Add a unit test that constructs an OutboxClient with a hostile-looking table name and asserts the literal does not appear in the compiled SQL, locking the parameterization in against future refactors.
…t_at Prior to this change, the retry strategy returned an absolute datetime computed from a Python-clock last_attempt_at, but the fetch query checked next_attempt_at <= func.now() against the DB clock. With drift between worker and DB hosts, retries fired at the wrong time. CLAUDE.md already called out the same asymmetry as fixed for release_stuck via make_interval; do the same here. RetryStrategyProto.get_next_attempt_at -> get_next_attempt_delay; return type changes from datetime | None to float | None (seconds). Strategy implementations no longer touch datetime arithmetic. The broker reads the delay off the row in _flush_retry and hands it to mark_pending_with_lease, which builds next_attempt_at server-side via now() + make_interval(secs => :delay). FakeOutboxClient mirrors the new shape with _utcnow() + timedelta. OutboxInnerMessage gets a pending_delay_seconds field that _nack populates and _flush_retry reads. Breaking change to RetryStrategyProto. v0, no compat shim. Integration test asserts next_attempt_at lands in [clock_timestamp() + delay, clock_timestamp() + delay] on either side of the call, locking in the server-side computation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.