feat(N4): inbound dead-letter queue for failed bot messages#1600
feat(N4): inbound dead-letter queue for failed bot messages#1600MervinPraison wants to merge 2 commits intomainfrom
Conversation
Persist failed agent.chat() calls to a SQLite-backed DLQ so user
messages are never silently lost on transient LLM failures.
- praisonai/bots/_dlq.py: InboundDLQ + DLQEntry (sqlite3 stdlib only,
WAL mode, atomic writes via per-instance threading.Lock; TTL +
max_size eviction; async replay with success/failure callback).
- praisonai/bots/_session.py: BotSessionManager(dlq=) optional param;
chat() now enqueues on agent.chat() exception then re-raises (so
adapter still shows user a friendly error). Default OFF preserves
legacy behaviour bit-for-bit.
- praisonai/bots/__init__.py: lazy exports for InboundDLQ, DLQEntry,
BotSessionManager.
- praisonai/cli/commands/bot.py: 'praisonai bot dlq {list,replay,purge}'
subcommand group with --path / --limit / --yes flags.
- tests/unit/bots/test_dlq.py: 11 unit tests (basic/persistence/bounds/
TTL/replay/wired-failure-path).
- scripts/smoke_n4_real.py: real agentic E2E β patch agent.chat to fail
once, assert DLQ holds entry, replay through real LLM (anthropic/
claude-haiku-4-5) returns expected answer.
Test results:
11/11 DLQ unit tests pass
102/102 bot/session/gateway regression pass (no W1 here, single
pre-existing unrelated test deselected)
Real agentic: 'What is 2+2?' fails, lands in DLQ, replay -> '4'
Docs: features/inbound-dlq.mdx with mermaid (Dark Red agents / Teal
tools / white text), CardGroup, Tabs, ParamField, Expandable,
AccordionGroup, Tip/Warning/Info/Check/Badge per Mintlify.
Closes N4.
β You've reached your Qodo monthly free-tier limit. Reviews pause until next month β upgrade your plan to continue now, or link your paid account if you already have one. |
π WalkthroughWalkthroughAdds a SQLite-backed inbound dead-letter queue (InboundDLQ), integrates it with BotSessionManager to enqueue failed inbound chats, provides CLI commands to list/purge/replay the DLQ, includes unit tests for DLQ behavior, and adds a real-LLM smoke script that exercises enqueue + replay end-to-end. ChangesInbound Dead-Letter Queue System
Sequence DiagramsequenceDiagram
participant User
participant BotSessionManager
participant Agent
participant InboundDLQ as DLQ
participant LLM
rect rgba(255, 0, 0, 0.5)
Note over BotSessionManager,InboundDLQ: Failure path
User->>BotSessionManager: chat(user_id, prompt)
BotSessionManager->>Agent: agent.chat(prompt)
Agent->>LLM: send prompt
LLM-->>Agent: error/exception
Agent-->>BotSessionManager: raises
BotSessionManager->>InboundDLQ: enqueue(entry)
InboundDLQ-->>BotSessionManager: entry_id
BotSessionManager-->>User: exception re-raised
end
rect rgba(0, 0, 255, 0.5)
Note over InboundDLQ,BotSessionManager: Replay path
User->>InboundDLQ: replay(handler)
loop for each queued entry (oldest-first)
InboundDLQ->>BotSessionManager: handler(entry)
BotSessionManager->>Agent: agent.chat(entry.prompt)
Agent->>LLM: send prompt
LLM-->>Agent: response
Agent-->>BotSessionManager: response
BotSessionManager-->>InboundDLQ: True (success) / False (fail)
InboundDLQ->>InboundDLQ: delete on success / inc attempts on fail
end
InboundDLQ-->>User: (succeeded, failed)
end
Estimated code review effortπ― 3 (Moderate) | β±οΈ ~25 minutes Poem
π₯ Pre-merge checks | β 4 | β 1β Failed checks (1 warning)
β Passed checks (4 passed)
βοΈ Tip: You can configure your own custom pre-merge checks in the settings. β¨ Finishing Touchesπ Generate docstrings
π§ͺ Generate unit tests (beta)
Tip π¬ Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. π Get started Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first β incorporate their findings. Review areas:
|
Greptile SummaryThis PR adds an opt-in SQLite-backed dead-letter queue (N4) to Two new P2 concerns remain:
Confidence Score: 5/5Safe to merge; all remaining findings are P2 and do not affect correctness on the default no-DLQ path. No P0 or P1 issues found. The two new P2 findings (sync DB calls in async replay, undocumented exception-path behavior change) are low-impact: replay is operator-initiated and typically runs on small queues, and the history behavior change is actually an improvement.
Important Files Changed
Sequence DiagramsequenceDiagram
participant Adapter as Bot Adapter
participant BSM as BotSessionManager
participant Agent as Agent
participant DLQ as InboundDLQ (SQLite)
participant Store as SessionStore
Adapter->>BSM: chat(agent, user_id, prompt)
BSM->>BSM: load user history
BSM->>Agent: swap chat_history β user history
BSM->>Agent: run_in_executor(agent.chat, prompt)
alt agent.chat succeeds
Agent-->>BSM: response
BSM->>Agent: capture updated_history, restore saved_history
BSM->>Store: run_in_executor(_save_history)
BSM-->>Adapter: return response
else agent.chat raises
Agent-->>BSM: Exception
opt dlq configured
BSM->>DLQ: run_in_executor(enqueue(platform, user_id, prompt, error))
Note over DLQ: BEGIN IMMEDIATE / evict_expired / INSERT / evict_overflow / COMMIT
end
BSM->>Agent: restore saved_history (no _save_history call)
BSM-->>Adapter: re-raise exception
end
Note over Adapter: Later β operator replay
Adapter->>DLQ: replay(handler)
loop each entry (oldest first)
DLQ->>BSM: await handler(entry)
BSM->>Agent: chat(entry.prompt)
alt success
DLQ->>DLQ: _delete(entry.id)
else failure
DLQ->>DLQ: _increment_attempts(entry.id)
end
end
DLQ-->>Adapter: (succeeded, failed)
Reviews (2): Last reviewed commit: "fix(dlq): address critical P1/P2 issues ..." | Re-trigger Greptile |
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import os |
There was a problem hiding this comment.
Hardcoded developer local path in docstring
The Run:: block references cd /Users/praison/worktrees/n4-dlq, a path that exists only on the original author's machine. Anyone else trying to follow the instructions verbatim will get a "No such file or directory" error. Replace with a repository-relative path or a placeholder.
| from __future__ import annotations | |
| import asyncio | |
| import os | |
| cd <repo-root> | |
| PYTHONPATH=src/praisonai-agents:src/praisonai python scripts/smoke_n4_real.py |
There was a problem hiding this comment.
Code Review
This pull request implements an Inbound Dead-Letter Queue (DLQ) to persist failed bot messages using SQLite, including updates to BotSessionManager, new CLI commands, and comprehensive tests. The review identifies a critical issue where synchronous database writes block the asynchronous event loop and suggests using run_in_executor. Additionally, feedback points out that the current SQLite isolation level compromises transaction atomicity and notes that the replay loop is inefficient due to repeated connection overhead.
| self._dlq.enqueue( | ||
| platform=self._platform, | ||
| user_id=user_id, | ||
| prompt=prompt, | ||
| error=f"{type(exc).__name__}: {exc}", | ||
| chat_id=chat_id, | ||
| thread_id=thread_id, | ||
| user_name=user_name, | ||
| ) |
There was a problem hiding this comment.
The call to self._dlq.enqueue is a synchronous, blocking SQLite write performed directly within an async method. This will block the event loop every time a message fails, which is problematic in high-concurrency bot environments. This should be wrapped in run_in_executor to maintain non-blocking behavior.
| self._dlq.enqueue( | |
| platform=self._platform, | |
| user_id=user_id, | |
| prompt=prompt, | |
| error=f"{type(exc).__name__}: {exc}", | |
| chat_id=chat_id, | |
| thread_id=thread_id, | |
| user_name=user_name, | |
| ) | |
| try: | |
| await loop.run_in_executor( | |
| None, | |
| lambda: self._dlq.enqueue( | |
| platform=self._platform, | |
| user_id=user_id, | |
| prompt=prompt, | |
| error=f"{type(exc).__name__}: {exc}", | |
| chat_id=chat_id, | |
| thread_id=thread_id, | |
| user_name=user_name, | |
| ) | |
| ) |
|
|
||
| # ββ Schema ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | ||
| def _connect(self) -> sqlite3.Connection: | ||
| conn = sqlite3.connect(str(self.path), isolation_level=None) |
There was a problem hiding this comment.
Setting isolation_level=None puts the connection in autocommit mode. While this is often used with WAL, the current implementation of enqueue (lines 161-175) relies on the with conn: context manager to provide atomicity. In sqlite3, the context manager only manages transactions if isolation_level is NOT None. As written, the eviction and insertion steps in enqueue are not atomic. Consider removing this or explicitly using BEGIN / COMMIT statements.
| conn = sqlite3.connect(str(self.path), isolation_level=None) | |
| conn = sqlite3.connect(str(self.path)) |
| for entry in entries: | ||
| try: | ||
| ok = await handler(entry) | ||
| except Exception as e: # pragma: no cover β defensive | ||
| logger.warning("DLQ replay handler raised: %s", e) | ||
| ok = False | ||
| if ok: | ||
| self._delete(entry.id) | ||
| succeeded += 1 | ||
| else: | ||
| self._increment_attempts(entry.id) | ||
| failed += 1 |
There was a problem hiding this comment.
The replay loop is inefficient because it opens and closes a new SQLite connection for every single entry via _delete or _increment_attempts. For a large DLQ, this will cause significant disk I/O overhead. Consider refactoring these helper methods to accept an existing connection object, or use a single connection for the duration of the loop.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and canβt be posted inline due to platform limitations.
β οΈ Outside diff range comments (1)
src/praisonai/praisonai/bots/_session.py (1)
175-206:β οΈ Potential issue | π΄ Critical | β‘ Quick winKeep
agent_lockheld for the fullagent.chat()round-trip.This code still runs the executor call after
agent.chat_historyhas been swapped back out from underagent_lock. Two concurrent users sharing the sameAgentcan interleave here, which leaks history across sessions and can restore the wrong state after success or failure. The swap,agent.chat(...), capture, and restore need to live in oneasync with agent_lock/try...finallyblock.π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/praisonai/praisonai/bots/_session.py` around lines 175 - 206, The agent lock is released before the chat round-trip and history save, allowing interleaving; modify the flow so the sequence "restore saved_history, call agent.chat(...), capture updated_history, and restore saved_history" occurs while holding async with agent_lock in a try/finally, then release the lock and call loop.run_in_executor(self._save_history, user_id, updated_history) outside the lock; update the exception branch to also restore agent.chat_history inside the same async with agent_lock/try/finally (referencing agent_lock, agent.chat_history, agent.chat(...), saved_history, updated_history, loop.run_in_executor, and _save_history).
π€ Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/smoke_n4_real.py`:
- Around line 27-32: The _pick_model function currently falls back to
"gpt-4o-mini" when no provider env vars are set; change it to fail fast: check
for ANTHROPIC_API_KEY and GOOGLE_API_KEY as today and, if neither is present,
raise a clear RuntimeError (or SystemExit) with an explicit message telling the
user which environment variable(s) must be configured (e.g., "set
ANTHROPIC_API_KEY or GOOGLE_API_KEY to run smoke tests"); keep the existing
returns for the two providers (anthropic/claude-haiku-4-5 and
gemini/gemini-2.5-flash) and only use the exception path when no supported API
key is found so the error surfaces immediately from _pick_model.
In `@src/praisonai/praisonai/bots/_dlq.py`:
- Line 242: The replay currently calls entries = self.list(limit=limit if limit
is not None else self.max_size) which returns newest-first; change it to fetch
entries oldest-first by querying the DLQ with ORDER BY ts ASC (e.g., replace the
self.list call with a query or helper that requests ORDER BY ts ASC or add an
order='ASC' arg to self.list), still honoring limit/max_size, so the variable
entries (and any replay function using it) processes oldest (earliest ts)
entries first; reference symbols: entries, self.list, self.max_size, and the ts
column.
In `@src/praisonai/praisonai/cli/commands/bot.py`:
- Around line 599-601: The replay command currently always exits zero; update
the dlq_replay flow (where asyncio.run(dlq.replay(replayer, limit=limit))
returns succeeded, failed) to raise typer.Exit(1) when there were any failures
and entries remain in the queue (i.e., if failed > 0 and dlq.size() > 0). Locate
the call to dlq.replay and the surrounding dlq_replay command handler, and after
printing the summary check the failed and dlq.size() values and raise
typer.Exit(1) to return a non-zero exit code.
---
Outside diff comments:
In `@src/praisonai/praisonai/bots/_session.py`:
- Around line 175-206: The agent lock is released before the chat round-trip and
history save, allowing interleaving; modify the flow so the sequence "restore
saved_history, call agent.chat(...), capture updated_history, and restore
saved_history" occurs while holding async with agent_lock in a try/finally, then
release the lock and call loop.run_in_executor(self._save_history, user_id,
updated_history) outside the lock; update the exception branch to also restore
agent.chat_history inside the same async with agent_lock/try/finally
(referencing agent_lock, agent.chat_history, agent.chat(...), saved_history,
updated_history, loop.run_in_executor, and _save_history).
πͺ Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
βΉοΈ Review info
βοΈ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7385a1eb-3deb-420c-b8b1-01b388f860fe
π Files selected for processing (6)
scripts/smoke_n4_real.pysrc/praisonai/praisonai/bots/__init__.pysrc/praisonai/praisonai/bots/_dlq.pysrc/praisonai/praisonai/bots/_session.pysrc/praisonai/praisonai/cli/commands/bot.pysrc/praisonai/tests/unit/bots/test_dlq.py
| entries = self.list(limit=limit if limit is not None else self.max_size) | ||
| succeeded = failed = 0 | ||
| for entry in entries: | ||
| try: | ||
| ok = await handler(entry) | ||
| except Exception as e: # pragma: no cover β defensive | ||
| logger.warning("DLQ replay handler raised: %s", e) | ||
| ok = False | ||
| if ok: | ||
| self._delete(entry.id) | ||
| succeeded += 1 | ||
| else: | ||
| self._increment_attempts(entry.id) | ||
| failed += 1 |
There was a problem hiding this comment.
Concurrent replays can deliver the same DLQ entry twice.
replay() snapshots rows and only deletes after await handler(entry), so two replayers can both process the same entry before either delete happens. That risks duplicate bot replies or duplicate downstream side effects. This needs a SQLite-side claim/lease step before handing an entry to the callback.
π§° Tools
πͺ Ruff (0.15.12)
[warning] 247-247: Do not catch blind exception: Exception
(BLE001)
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task ββ View job Final Architecture Review - Phase 3: Final Verdict βTodo Checklist:
β APPROVED - All Critical Issues ResolvedArchitecture Compliance Reviewβ
Protocol-driven: DLQ correctly implemented in wrapper layer ( Issues Fixed (Commit:
|
- Fix non-atomic SQLite enqueue operations by using explicit transactions - Fix event loop blocking by wrapping DLQ calls in run_in_executor - Fix replay ordering to process oldest entries first (not newest) - Fix CLI exit codes for user cancellation and failed replays - Fix smoke test to fail fast when no supported API key is found Resolves critical issues identified by Greptile, CodeRabbit, Gemini reviewers Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
β»οΈ Duplicate comments (1)
src/praisonai/praisonai/bots/_dlq.py (1)
264-278:β οΈ Potential issue | π Major | ποΈ Heavy liftConcurrent replay can still deliver the same entry to the handler twice.
replay()snapshots entries via_list_oldest_first(), then releasesself._lockbetween each_delete()call. If two callers runreplay()concurrently they both see the same snapshot and both invokehandler(entry)for the same entries before either delete is committed. For the CLI this is unlikely, but for any future library-level use this is a real duplicate-delivery risk.A SQLite
UPDATE β¦ SET in_flight=1 WHERE in_flight=0claim step before handing an entry to the handler would prevent this.π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/praisonai/praisonai/bots/_dlq.py` around lines 264 - 278, replay() currently snapshots entries via _list_oldest_first() and releases self._lock, allowing concurrent replayers to call handler(entry) for the same entry; fix by adding a database "claim" step before calling handler: for each entry returned by _list_oldest_first(), execute an UPDATE that sets in_flight=1 only where in_flight=0 (atomic claim, e.g. UPDATE ... SET in_flight=1 WHERE id=? AND in_flight=0) and skip entries where no rows were updated, then call handler only for claimed entries; on successful handler result call _delete(entry.id), on failure call _increment_attempts(entry.id) and reset in_flight=0 or clear the claim as appropriate; make sure the claim update is persisted (use a transaction/commit) before invoking handler to avoid duplicate delivery between concurrent replay() callers.
π€ Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/praisonai/praisonai/bots/_session.py`:
- Around line 180-191: The lambda passed to loop.run_in_executor captures the
exception name `exc` from the enclosing `except` block, causing Ruff F821;
evaluate the error string eagerly before creating the lambda to avoid
referencing `exc` after the except scope. Specifically, in the except handler
build a local variable (e.g., error_str = f"{type(exc).__name__}: {exc}") and
then call loop.run_in_executor with a lambda that uses `error_str` (and the
existing symbols _dlq.enqueue, platform, user_id, prompt, chat_id, thread_id,
user_name) instead of referencing `exc` directly.
---
Duplicate comments:
In `@src/praisonai/praisonai/bots/_dlq.py`:
- Around line 264-278: replay() currently snapshots entries via
_list_oldest_first() and releases self._lock, allowing concurrent replayers to
call handler(entry) for the same entry; fix by adding a database "claim" step
before calling handler: for each entry returned by _list_oldest_first(), execute
an UPDATE that sets in_flight=1 only where in_flight=0 (atomic claim, e.g.
UPDATE ... SET in_flight=1 WHERE id=? AND in_flight=0) and skip entries where no
rows were updated, then call handler only for claimed entries; on successful
handler result call _delete(entry.id), on failure call
_increment_attempts(entry.id) and reset in_flight=0 or clear the claim as
appropriate; make sure the claim update is persisted (use a transaction/commit)
before invoking handler to avoid duplicate delivery between concurrent replay()
callers.
πͺ Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
βΉοΈ Review info
βοΈ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 45f55c01-6b60-4511-8f5f-15571734d766
π Files selected for processing (4)
scripts/smoke_n4_real.pysrc/praisonai/praisonai/bots/_dlq.pysrc/praisonai/praisonai/bots/_session.pysrc/praisonai/praisonai/cli/commands/bot.py
| await loop.run_in_executor( | ||
| None, | ||
| lambda: self._dlq.enqueue( | ||
| platform=self._platform, | ||
| user_id=user_id, | ||
| prompt=prompt, | ||
| error=f"{type(exc).__name__}: {exc}", | ||
| chat_id=chat_id, | ||
| thread_id=thread_id, | ||
| user_name=user_name, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Fix Ruff F821 error: exc is captured by name in a lambda inside an except block.
Python 3 implicitly deletes the as exc binding when the except clause exits (to break cyclic GC references). The lambda captures exc by name, not value β Ruff correctly flags this as a potential undefined-name reference (reported twice at [error] level). At runtime the code is safe because await run_in_executor(β¦) completes before the except block exits, but the static analysis error may fail any CI job that enforces Ruff.
Evaluate the error string eagerly before creating the lambda so the binding lifetime is no longer relevant:
π Proposed fix
except Exception as exc:
# N4: persist the failed inbound message before bubbling.
if self._dlq is not None:
+ error_str = f"{type(exc).__name__}: {exc}"
try:
# Use run_in_executor to avoid blocking the event loop
await loop.run_in_executor(
None,
lambda: self._dlq.enqueue(
platform=self._platform,
user_id=user_id,
prompt=prompt,
- error=f"{type(exc).__name__}: {exc}",
+ error=error_str,
chat_id=chat_id,
thread_id=thread_id,
user_name=user_name,
)
)π Committable suggestion
βΌοΈ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await loop.run_in_executor( | |
| None, | |
| lambda: self._dlq.enqueue( | |
| platform=self._platform, | |
| user_id=user_id, | |
| prompt=prompt, | |
| error=f"{type(exc).__name__}: {exc}", | |
| chat_id=chat_id, | |
| thread_id=thread_id, | |
| user_name=user_name, | |
| ) | |
| ) | |
| error_str = f"{type(exc).__name__}: {exc}" | |
| try: | |
| # Use run_in_executor to avoid blocking the event loop | |
| await loop.run_in_executor( | |
| None, | |
| lambda: self._dlq.enqueue( | |
| platform=self._platform, | |
| user_id=user_id, | |
| prompt=prompt, | |
| error=error_str, | |
| chat_id=chat_id, | |
| thread_id=thread_id, | |
| user_name=user_name, | |
| ) | |
| ) |
π§° Tools
πͺ Ruff (0.15.12)
[error] 186-186: Undefined name exc
(F821)
[error] 186-186: Undefined name exc
(F821)
π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai/praisonai/bots/_session.py` around lines 180 - 191, The lambda
passed to loop.run_in_executor captures the exception name `exc` from the
enclosing `except` block, causing Ruff F821; evaluate the error string eagerly
before creating the lambda to avoid referencing `exc` after the except scope.
Specifically, in the except handler build a local variable (e.g., error_str =
f"{type(exc).__name__}: {exc}") and then call loop.run_in_executor with a lambda
that uses `error_str` (and the existing symbols _dlq.enqueue, platform, user_id,
prompt, chat_id, thread_id, user_name) instead of referencing `exc` directly.
|
validate every single change, making sure we are not bloating for the sake of it, but it actually adds value to praisonai |
|
Claude finished @MervinPraison's task ββ View job N4 Inbound DLQ Feature - Final Architectural Validation βTodo Checklist:
FINAL VERDICT: HIGH VALUE, ZERO BLOAT βArchitectural Compliance vs AGENTS.mdβ Protocol-Driven Core Principle (4.1)
β No Performance Impact Principle (4.2)
β Agent-Centric Design Principle (4.4)
β Async-Safe & Multi-Agent Safe Principle (4.5)
β Safe Defaults Principle (4.6)
Value AssessmentBusiness Value: CRITICAL π―
Implementation Quality: EXCELLENT π
Code Quality: HIGH β¨
Comparison to Bloat Anti-Patterns
RECOMMENDATION: APPROVE AND MERGE πThis PR represents high-value, production-critical functionality implemented with zero architectural bloat. The inbound dead-letter queue solves a real customer pain point (message loss during LLM failures) using minimal, well-architected code that follows all PraisonAI design principles. Why This Adds Value:
Why This Is Not Bloat:
This exemplifies high-quality PraisonAI development: solving important problems with minimal, well-architected code that respects the framework's design principles. |
N4 β Inbound Dead-Letter Queue for Bot Messages
Problem
Today, when
BotSessionManager.chat()raises (LLM 5xx, transient timeout, provider rate-limit), the user's message is silently lost β the exception bubbles up, the bot adapter logs it, the message is gone.For unreliable LLM/network conditions in production this loses customer data.
Solution
Optional, opt-in inbound DLQ. Set
dlq=InboundDLQ(...)and PraisonAI persists failed messages to a SQLite file before the exception bubbles up. An operator can later inspect and replay them.Files
praisonai/bots/_dlq.py(new)InboundDLQ+DLQEntry: SQLite (stdlib), WAL mode, atomic locked writes, TTL +max_sizeeviction, async replay with handler callbackpraisonai/bots/_session.pydlq=param onBotSessionManager;chat()enqueues on exception then re-raisespraisonai/bots/__init__.pyInboundDLQ,DLQEntry,BotSessionManagerpraisonai/cli/commands/bot.pypraisonai bot dlq {list,replay,purge}subcommand grouptests/unit/bots/test_dlq.py(new)scripts/smoke_n4_real.py(new)PraisonAIDocs/docs/features/inbound-dlq.mdx(new)Test Results
CLI
praisonai bot dlq list # newest first praisonai bot dlq list --path /var/lib/x/dlq.sqlite -n 50 praisonai bot dlq replay --config bot.yaml praisonai bot dlq purge --yesBackward compatibility
dlq=, behaviour is bit-for-bit identical to before.BotSessionManageronly.Performance
if self._dlq is not Nonebranch on the exception path only.sqlite3only; no new dependency.InboundDLQis not imported until first reference.PraisonAI invariants verified
sqlite3is stdlib;InboundDLQlazily exposed.threading.Lock; replay uses async handler.What's next
This is part of the round-2 gap analysis (N1βN10). With N4 (this) and W1 (PR #1599) landed, the remaining roadmap is:
Verification
Summary by CodeRabbit
New Features
CLI
Tests