From ea93c78528a07e2a8f5edb9cd8e3fa99e04878ca Mon Sep 17 00:00:00 2001 From: Elizabeth Locke Date: Fri, 22 May 2026 15:00:09 -0400 Subject: [PATCH] Fix pdb / breakpoint() hang in workflow code (#1104) When debug_mode=True (or TEMPORAL_DEBUG=1), breakpoint() inside workflow code now opens an interactive pdb prompt -- including from a sandboxed workflow run under pytest. Four pieces: - Inline dispatch on the asyncio main thread (via loop.call_soon to avoid nesting inside the dispatch task's __step() and tripping Python 3.14's task-entry validation). - breakpoint removed from the sandbox's invalid builtins so the call reaches the worker hook. Nothing else is relaxed. - A Pdb subclass that lands at the workflow's own frame, suspends sandbox checks during each REPL interaction, and overrides q/Ctrl-D to continue the workflow instead of failing it with BdbQuit. - A defensive sys.breakpointhook that raises a clear RuntimeError when breakpoint() is called from a workflow worker thread without debug_mode, replacing the previous silent hang. When debug_mode is not set, the worker's dispatch and sandbox config are unchanged. Adds a README subsection on debugging workflows and five tests at tests/worker/test_breakpoint_hang.py. Verified on Python 3.13 and 3.14. Closes #1104. --- README.md | 74 ++++++++ temporalio/worker/_workflow.py | 272 ++++++++++++++++++++++----- tests/worker/test_breakpoint_hang.py | 222 ++++++++++++++++++++++ 3 files changed, 523 insertions(+), 45 deletions(-) create mode 100644 tests/worker/test_breakpoint_hang.py diff --git a/README.md b/README.md index d284fad59..d29625874 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ informal introduction to the features and their implementation. - [Customizing the Sandbox](#customizing-the-sandbox) - [Passthrough Modules](#passthrough-modules) - [Invalid Module Members](#invalid-module-members) + - [Debugging Workflows with `breakpoint()` / `pdb`](#debugging-workflows-with-breakpoint--pdb) - [Known Sandbox Issues](#known-sandbox-issues) - [Global Import/Builtins](#global-importbuiltins) - [Sandbox is not Secure](#sandbox-is-not-secure) @@ -1241,6 +1242,79 @@ my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_ See the API for more details on exact fields and their meaning. +##### Debugging Workflows with `breakpoint()` / `pdb` + +Setting `debug_mode=True` on the `Worker` (or `TEMPORAL_DEBUG=1` in the environment) routes workflow activations +onto the asyncio main thread instead of a worker thread pool. This lets `breakpoint()` and `pdb.set_trace()` +inside workflow code open an interactive REPL — without it, pdb hangs because its `input()` call would run on a +thread that does not own the controlling TTY. + +A minimal runnable example: + +```python +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker + + +@workflow.defn +class DebugMeWorkflow: + @workflow.run + async def run(self) -> str: + x = 42 + breakpoint() # interactive pdb prompt opens at this line + return f"x was {x}" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue="debug-me", + workflows=[DebugMeWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + DebugMeWorkflow.run, + id="debug-me-wf", + task_queue="debug-me", + task_timeout=timedelta(minutes=10), # see caveat below + ) + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Run with `python debug_me.py`, or under pytest with `pytest -s` (the `-s` flag disables pytest's stdin +capture). At the `(Pdb)` prompt you'll land at the line where `breakpoint()` was called, with workflow +locals in scope. Try `p x`, `n`, `c`, `q`. + +**Quitting cleanly.** Typing `q` or hitting Ctrl-D continues the workflow rather than raising `BdbQuit` +(which would fail the workflow task). To genuinely abort, kill the outer process with Ctrl-C. + +Two caveats when pausing at a breakpoint inside a workflow: + +1. **Workflow task timeout.** Temporal expires a workflow task after ~10 seconds by default. If you sit at the + `(Pdb)` prompt longer than that, the server reassigns the task and your workflow replays from the start when + you continue — re-hitting the breakpoint. Pass `task_timeout=timedelta(minutes=N)` to `execute_workflow` / + `start_workflow` to give yourself debugging headroom: + + ```python + await client.execute_workflow(MyWorkflow.run, ..., task_timeout=timedelta(minutes=10)) + ``` + +2. **Deterministic replay.** Workflows are deterministic and replay from history; any wall-clock pause violates + that contract. For post-mortem debugging without these caveats, use the [Replayer](#replayer) on a recorded + history instead of live debugging. + +A `breakpoint()` call from workflow code without `debug_mode` enabled raises a `RuntimeError` with a pointer to +this section, so the failure mode is loud rather than a silent hang. + ##### Known Sandbox Issues Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved. diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index bb489329a..f9326b54e 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -13,7 +13,7 @@ from collections.abc import Awaitable, Callable, MutableMapping, Sequence from dataclasses import dataclass from datetime import timedelta, timezone -from types import TracebackType +from types import FrameType, TracebackType import temporalio.api.common.v1 import temporalio.bridge.proto.workflow_activation @@ -48,6 +48,140 @@ # Set to true to log all activations and completions LOG_PROTOS = False +# Prefix used to detect threads in the workflow task ThreadPoolExecutor. +_WORKFLOW_THREAD_NAME_PREFIX = "temporal_workflow_" + +_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook + + +def _build_workflow_pdb_class() -> type: + """Build a Pdb subclass that suspends sandbox restrictions during the REPL. + + pdb's cmdloop touches ``readline.get_completer`` and other + sandbox-restricted internals each time it interacts with the user; we + bracket each interaction with ``_sandbox_unrestricted.value = True`` and + restore the previous value afterwards. Outside the REPL the sandbox + stays intact. + + Imports are function-local because ``pdb`` is a debug-only dependency + and pulls in ``cmd``/``bdb``/``linecache`` (no reason to pay that cost + at worker import time), and ``_sandbox_unrestricted`` is a private + sandbox internal that this hook is the only legitimate caller of. + """ + import pdb + + from temporalio.workflow._sandbox import _sandbox_unrestricted + + class _WorkflowPdb(pdb.Pdb): + # The `interaction` signature differs across Python versions: 3.10-3.12 + # typeshed names the second parameter `traceback: TracebackType | None`, + # while 3.13+ renames it `tb_or_exc` and widens the type to include + # `BaseException`. No single signature satisfies both stubs, so we + # suppress the override check. + def interaction( # type: ignore[override] + self, + frame: FrameType | None, + tb_or_exc: TracebackType | BaseException | None, + ) -> None: + prev = getattr(_sandbox_unrestricted, "value", False) + _sandbox_unrestricted.value = True + try: + super().interaction(frame, tb_or_exc) # type: ignore[arg-type] + finally: + _sandbox_unrestricted.value = prev + + # Override `q`/`quit`/`exit`/EOF (Ctrl-D) to behave like `continue`. + # Default pdb raises `BdbQuit`, which propagates as an uncaught + # exception out of workflow.run, fails the workflow task, and + # triggers a server retry storm during teardown. For a debug + # session the user almost always wants "stop debugging and let the + # workflow finish" — that's `continue`. Users who truly want to + # abort can Ctrl-C the outer shell. + def do_quit(self, arg: str) -> bool | None: + self.message( + "[Temporal] 'q'/Ctrl-D continues the workflow. " + "Ctrl-C the outer shell to abort." + ) + return self.do_continue(arg) + + do_q = do_exit = do_quit + do_EOF = do_quit + + return _WorkflowPdb + + +def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object: + if threading.current_thread().name.startswith(_WORKFLOW_THREAD_NAME_PREFIX): + raise RuntimeError( + "breakpoint() / pdb.set_trace() inside workflow code requires " + "debug_mode=True (or the TEMPORAL_DEBUG environment variable) on " + "the Worker. Without it the workflow runs on a thread pool and " + "pdb's interactive REPL cannot read stdin." + ) + if not temporalio.workflow.in_workflow(): + # Not inside a workflow activation — let pytest's wrapper, ipdb, or + # whatever else is configured handle it. + return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs) + # Inside a workflow: drop the user into pdb at the caller's frame (the + # workflow's `run` method, where breakpoint() was actually written) rather + # than landing inside this hook. Bypassing the configured breakpoint hook + # also avoids pytest's pdb wrapper, which assumes a test-code context and + # touches sandbox-restricted internals during its terminal-writer setup. + # `sandbox_unrestricted()` lifts member checks for the duration of the + # REPL so pdb's own initialization (readline, etc.) isn't blocked. + # `skip` tells pdb not to stop in our hook frame or the contextlib + # plumbing — without it pdb's first step lands at the `with` teardown + # instead of the user's next workflow line. + caller_frame = sys._getframe(1) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + pdb_cls = _build_workflow_pdb_class() + pdb_cls( + skip=[ + "temporalio.worker._workflow", + "temporalio.workflow._sandbox", + "contextlib", + ] + ).set_trace(caller_frame) + return None + + +def _install_workflow_breakpoint_hook() -> None: + if sys.breakpointhook is not _temporal_workflow_breakpoint_hook: + sys.breakpointhook = _temporal_workflow_breakpoint_hook + + +def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner: + """Allow ``breakpoint()`` past the sandbox so it can reach the worker hook. + + The sandbox flags ``breakpoint`` as non-deterministic by default; without + this relaxation the call raises before our breakpoint hook can run. + Once inside the hook, the hook itself enters ``sandbox_unrestricted()`` + for the duration of the debugger session, so pdb's internals (readline, + os.environ, etc.) aren't blocked either — without permanently dropping + sandbox checks for the rest of workflow execution. + """ + from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner + + if not isinstance(workflow_runner, SandboxedWorkflowRunner): + return workflow_runner + + restrictions = workflow_runner.restrictions + invalid = restrictions.invalid_module_members + builtins_matcher = invalid.children.get("__builtins__") + if builtins_matcher is None or "breakpoint" not in builtins_matcher.use: + return workflow_runner + + new_use = set(builtins_matcher.use) - {"breakpoint"} + new_builtins = dataclasses.replace(builtins_matcher, use=new_use) + new_invalid = dataclasses.replace( + invalid, children={**invalid.children, "__builtins__": new_builtins} + ) + new_restrictions = dataclasses.replace( + restrictions, invalid_module_members=new_invalid + ) + return dataclasses.replace(workflow_runner, restrictions=new_restrictions) + + # Value was chosen abitrarily as a small number that allows some concurrency and prevents # large numbers of concurrent external storage operations causing resource contention. # This default limit is per workflow task activation and does not limit the total number @@ -96,6 +230,13 @@ def __init__( ) ) self._workflow_task_executor_user_provided = workflow_task_executor is not None + + # Debug mode (also enabled by TEMPORAL_DEBUG) disables deadlock + # detection, runs activations inline on the main thread, and lifts + # the sandbox restriction on breakpoint()/input() so pdb works. + self._debug_mode = bool(debug_mode or os.environ.get("TEMPORAL_DEBUG")) + if self._debug_mode: + workflow_runner = _relax_sandbox_for_debugger(workflow_runner) self._workflow_runner = workflow_runner self._unsandboxed_workflow_runner = unsandboxed_workflow_runner self._data_converter = data_converter @@ -129,9 +270,9 @@ def __init__( # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable # deadlock detection, otherwise set to 2 seconds - self._deadlock_timeout_seconds = ( - None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2 - ) + self._deadlock_timeout_seconds = None if self._debug_mode else 2 + + _install_workflow_breakpoint_hook() # Keep track of workflows that could not be evicted self._could_not_evict_count = 0 @@ -241,6 +382,34 @@ async def drain_poll_queue(self) -> None: except PollShutdownError: return + async def _activate_inline_for_debug( + self, + loop: asyncio.AbstractEventLoop, + workflow: _RunningWorkflow, + act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: + # Indirect through call_soon + a future so the activation runs outside + # the dispatch task's __step() context. Python 3.14 refuses to enter a + # task while another on the same thread is mid-step; suspending at the + # await below clears that state so workflow.activate can step its own + # task without collision. + future: asyncio.Future = loop.create_future() + + def run_inline() -> None: + # _run_once clears the running-loop registration on exit; restore + # the main loop so later code sees the right one. + main_loop = asyncio._get_running_loop() + try: + completion = workflow.activate(act) + future.set_result(completion) + except BaseException as e: + future.set_exception(e) + finally: + asyncio._set_running_loop(main_loop) + + loop.call_soon(run_inline) + return await future + async def _handle_activation( self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation ) -> None: @@ -330,35 +499,43 @@ async def _handle_activation( ) self._running_workflows[act.run_id] = workflow - # Run activation in separate thread so we can check if it's - # deadlocked - activate_task = asyncio.get_running_loop().run_in_executor( - self._workflow_task_executor, - workflow.activate, - act, - ) - - # Run activation task with deadlock timeout - try: - completion = await asyncio.wait_for( - activate_task, self._deadlock_timeout_seconds + if self._debug_mode: + # Inline on the main thread so pdb / breakpoint() can read + # stdin. The loop blocks during the activation — that's the + # intended single-stepping semantic. + completion = await self._activate_inline_for_debug( + asyncio.get_running_loop(), workflow, act ) - except asyncio.TimeoutError: - # Need to create the deadlock exception up here so it - # captures the trace now instead of later after we may have - # interrupted it - deadlock_exc = _DeadlockError.from_deadlocked_workflow( - workflow.instance, self._deadlock_timeout_seconds + else: + # Run activation in separate thread so we can check if it's + # deadlocked + activate_task = asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, ) - # When we deadlock, we will raise an exception to fail - # the task. But before we do that, we want to try to - # interrupt the thread and put this activation task on - # the workflow so that the successive eviction can wait - # on it before trying to evict. - workflow.attempt_deadlock_interruption() - # Set the task and raise - workflow.deadlocked_activation_task = activate_task - raise deadlock_exc from None + + # Run activation task with deadlock timeout + try: + completion = await asyncio.wait_for( + activate_task, self._deadlock_timeout_seconds + ) + except asyncio.TimeoutError: + # Need to create the deadlock exception up here so it + # captures the trace now instead of later after we may have + # interrupted it + deadlock_exc = _DeadlockError.from_deadlocked_workflow( + workflow.instance, self._deadlock_timeout_seconds + ) + # When we deadlock, we will raise an exception to fail + # the task. But before we do that, we want to try to + # interrupt the thread and put this activation task on + # the workflow so that the successive eviction can wait + # on it before trying to evict. + workflow.attempt_deadlock_interruption() + # Set the task and raise + workflow.deadlocked_activation_task = activate_task + raise deadlock_exc from None except Exception as err: if isinstance(err, _DeadlockError): @@ -576,22 +753,27 @@ async def _handle_cache_eviction( handle_eviction_task: asyncio.Future | None = None while True: try: - # We only create the eviction task if we haven't already or - # it is done. This is because if it already is running and - # timed out, it's still running (and holding on to a - # thread). But if did complete running but failed with - # another error, we want to re-create the task. - if not handle_eviction_task or handle_eviction_task.done(): - handle_eviction_task = ( - asyncio.get_running_loop().run_in_executor( - self._workflow_task_executor, - workflow.activate, - act, + if self._debug_mode: + await self._activate_inline_for_debug( + asyncio.get_running_loop(), workflow, act + ) + else: + # We only create the eviction task if we haven't already or + # it is done. This is because if it already is running and + # timed out, it's still running (and holding on to a + # thread). But if did complete running but failed with + # another error, we want to re-create the task. + if not handle_eviction_task or handle_eviction_task.done(): + handle_eviction_task = ( + asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, + ) ) + await asyncio.wait_for( + handle_eviction_task, self._deadlock_timeout_seconds ) - await asyncio.wait_for( - handle_eviction_task, self._deadlock_timeout_seconds - ) # Break if it succeeds break except BaseException as err: diff --git a/tests/worker/test_breakpoint_hang.py b/tests/worker/test_breakpoint_hang.py new file mode 100644 index 000000000..7eb81c83d --- /dev/null +++ b/tests/worker/test_breakpoint_hang.py @@ -0,0 +1,222 @@ +"""Tests for the pdb / breakpoint() fix in workflow code (#1104).""" + +from __future__ import annotations + +import pdb +import sys +import threading +import uuid +from types import FrameType +from typing import Any +from unittest.mock import patch + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.worker._workflow import _temporal_workflow_breakpoint_hook + + +@workflow.defn(sandboxed=False) +class ThreadCaptureWorkflow: + """Returns the name of the thread the workflow runs on. + + `sandboxed=False` so `threading.current_thread()` isn't intercepted — + these tests are about thread placement, not sandbox behavior. + """ + + @workflow.run + async def run(self) -> str: + return threading.current_thread().name + + +async def test_workflow_runs_on_pool_thread_without_debug_mode(client: Client): + """Production behavior unchanged: workflows run on `temporal_workflow_*`.""" + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ThreadCaptureWorkflow], + ): + thread_name = await client.execute_workflow( + ThreadCaptureWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + main_name = threading.main_thread().name + assert thread_name != main_name, ( + f"workflow ran on the main thread ({main_name!r}) — production behavior changed" + ) + assert thread_name.startswith("temporal_workflow_"), ( + f"expected pool thread, got {thread_name!r}" + ) + + +async def test_workflow_runs_on_main_thread_in_debug_mode(client: Client): + """debug_mode=True moves workflow activation to the asyncio main thread + so pdb's input() reaches the controlling TTY.""" + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ThreadCaptureWorkflow], + debug_mode=True, + ): + thread_name = await client.execute_workflow( + ThreadCaptureWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + main_name = threading.main_thread().name + assert thread_name == main_name, ( + f"expected workflow on main thread ({main_name!r}) in debug mode; " + f"got {thread_name!r}" + ) + + +@workflow.defn +class SandboxedBreakpointWorkflow: + """Sandboxed workflow that calls breakpoint() — verifies the fix works + without requiring users to switch to UnsandboxedWorkflowRunner.""" + + @workflow.run + async def run(self) -> str: + bird = "chicken" + breakpoint() + return f"bird was {bird}" + + +async def test_breakpoint_works_in_sandboxed_workflow_in_debug_mode(client: Client): + """breakpoint() inside a sandboxed workflow reaches the debugger when + debug_mode=True — no need to switch to UnsandboxedWorkflowRunner. + + Patches `pdb.Pdb.set_trace` with a stub so CI doesn't hang on an + interactive prompt. Reaching the stub on `MainThread` with the + workflow's `run` frame proves the full path (sandbox relaxation -> + our hook -> pdb) works through the sandbox. Also verifies workflow + locals (`bird`) are visible in the captured frame. + """ + captured: dict[str, object] = {} + + def stub_set_trace(_self: pdb.Pdb, frame: FrameType | None = None) -> None: + captured["thread"] = threading.current_thread().name + captured["frame_name"] = frame.f_code.co_name if frame else None + captured["bird"] = frame.f_locals.get("bird") if frame else None + captured["called"] = True + + task_queue = f"tq-{uuid.uuid4()}" + with patch.object(pdb.Pdb, "set_trace", stub_set_trace): + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "bird was chicken", ( + f"workflow did not complete; breakpoint() likely raised inside the sandbox: " + f"result={result!r}" + ) + assert captured.get("called"), "pdb.Pdb.set_trace was never reached" + assert captured["thread"] == threading.main_thread().name, ( + f"breakpoint landed on {captured['thread']!r}, not the main thread" + ) + assert captured["frame_name"] == "run", ( + f"breakpoint stopped at frame {captured['frame_name']!r}, " + f"expected the workflow's `run` method" + ) + assert captured["bird"] == "chicken", ( + f"workflow local `bird` not visible in pdb frame: got {captured['bird']!r}" + ) + + +async def test_breakpoint_quit_continues_workflow_in_debug_mode(client: Client): + """Typing `q` (or hitting Ctrl-D) in a workflow pdb session should + continue the workflow rather than failing the workflow task with + BdbQuit. The hook overrides `do_quit`/`do_EOF` to call `do_continue` + instead, so a debug session ends cleanly. + + Drives pdb via `cmdqueue` so no real stdin is needed. The first + iteration of cmdloop sees `q`, which dispatches to our overridden + `do_quit` -> `do_continue`. The workflow then completes normally. + """ + captured: dict[str, object] = {} + + class _AutoQuitPdb(pdb.Pdb): + """Pdb subclass that pre-queues `q` and captures frame state on + entry to `interaction`.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.cmdqueue = ["q"] + + def interaction( # type: ignore[override] + self, frame: FrameType | None, traceback: Any + ) -> Any: + if frame is not None: + captured["frame_name"] = frame.f_code.co_name + captured["bird"] = frame.f_locals.get("bird") + return super().interaction(frame, traceback) + + task_queue = f"tq-{uuid.uuid4()}" + with patch("pdb.Pdb", _AutoQuitPdb): + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "bird was chicken", ( + f"workflow did not complete after `q`; `BdbQuit` likely propagated: " + f"result={result!r}" + ) + assert captured.get("frame_name") == "run", ( + f"pdb didn't stop in workflow.run frame: got {captured.get('frame_name')!r}" + ) + assert captured.get("bird") == "chicken", ( + f"workflow local `bird` not visible at pdb breakpoint: " + f"got {captured.get('bird')!r}" + ) + + +def test_breakpoint_hook_raises_on_workflow_thread(client: Client): + """The defensive hook fails loudly when breakpoint() is called from a + `temporal_workflow_*` thread without debug mode.""" + # Constructing a Worker installs the hook. We don't need to run anything. + Worker( + client, + workflows=[ThreadCaptureWorkflow], + task_queue=f"tq-{uuid.uuid4()}", + ) + assert sys.breakpointhook is _temporal_workflow_breakpoint_hook + + captured: list[BaseException] = [] + + def call_breakpoint_on_worker_thread() -> None: + try: + sys.breakpointhook() + except BaseException as e: + captured.append(e) + + t = threading.Thread( + target=call_breakpoint_on_worker_thread, + name="temporal_workflow__test", + ) + t.start() + t.join() + + assert len(captured) == 1 + assert isinstance(captured[0], RuntimeError) + assert "debug_mode=True" in str(captured[0])