Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,14 @@ my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_

See the API for more details on exact fields and their meaning.

##### Debugging Workflows in the Sandbox

To attach an IDE debugger (e.g. the VSCode Python debugger or PyCharm) to workflow code running inside the
sandbox, set `debug_mode=True` on the `Worker`, or set the `TEMPORAL_DEBUG` environment variable to a
truthy value. This disables the workflow deadlock detector — which would otherwise interrupt a paused
workflow at the 2-second timeout — and adds `_pydevd_bundle` (the module IDE debuggers inject at runtime)
to the sandbox passthrough list so breakpoints in workflow code are hit.

##### Known Sandbox Issues

Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved.
Expand Down
24 changes: 19 additions & 5 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from temporalio.api.enums.v1 import WorkflowTaskFailedCause
from temporalio.bridge.worker import PollShutdownError
from temporalio.converter import StorageDriverStoreContext, StorageDriverWorkflowInfo
from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner

from . import _command_aware_visitor
from ._interceptor import (
Expand Down Expand Up @@ -85,6 +86,9 @@ def __init__(
encode_headers: bool,
max_workflow_task_external_storage_concurrency: int,
) -> None:
# Debug mode is enabled if specified or if the TEMPORAL_DEBUG env var is truthy
debug_mode = debug_mode or bool(os.environ.get("TEMPORAL_DEBUG"))

self._bridge_worker = bridge_worker
self._namespace = namespace
self._task_queue = task_queue
Expand All @@ -96,7 +100,19 @@ def __init__(
)
)
self._workflow_task_executor_user_provided = workflow_task_executor is not None

# If debug mode is enabled, ensure that the debugpy (https://github.com/microsoft/debugpy)
# import is added as a passthrough
if debug_mode and isinstance(workflow_runner, SandboxedWorkflowRunner):
workflow_runner = dataclasses.replace(
workflow_runner,
restrictions=workflow_runner.restrictions.with_passthrough_modules(
"_pydevd_bundle"
),
)

self._workflow_runner = workflow_runner

self._unsandboxed_workflow_runner = unsandboxed_workflow_runner
self._data_converter = data_converter
# Build the interceptor classes and collect extern functions
Expand Down Expand Up @@ -127,11 +143,9 @@ def __init__(
)
self._throw_after_activation: Exception | None = None

# If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable
# deadlock detection, otherwise set to 2 seconds
self._deadlock_timeout_seconds = (
None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2
)
# If debug mode is enabled, disable deadlock detection
# otherwise set to 2 seconds
self._deadlock_timeout_seconds = None if debug_mode else 2

# Keep track of workflows that could not be evicted
self._could_not_evict_count = 0
Expand Down
57 changes: 57 additions & 0 deletions tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import concurrent.futures
import multiprocessing
import multiprocessing.context
import os
import uuid
from collections.abc import Awaitable, Callable, Sequence
from contextlib import contextmanager
from datetime import timedelta
from typing import Any
from urllib.request import urlopen
Expand Down Expand Up @@ -57,6 +59,7 @@
WorkerTuner,
WorkflowSlotInfo,
)
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
from temporalio.workflow import DynamicWorkflowConfig, VersioningIntent
from tests.helpers import (
assert_eventually,
Expand Down Expand Up @@ -1650,3 +1653,57 @@ def test_worker_config_matches_init_params():
f"Missing from config: {init_params - config_keys}. "
f"Extra in config: {config_keys - init_params}."
)


async def test_worker_debug_mode(client: Client):
worker = Worker(
client,
workflows=[SimpleWorkflow],
task_queue=f"task-queue-{uuid.uuid4()}",
)
assert worker._workflow_worker
assert worker._workflow_worker._deadlock_timeout_seconds == 2
assert isinstance(worker._workflow_worker._workflow_runner, SandboxedWorkflowRunner)
assert (
"_pydevd_bundle"
not in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
)

worker = Worker(
client,
workflows=[SimpleWorkflow],
task_queue=f"task-queue-{uuid.uuid4()}",
debug_mode=True,
)
assert worker._workflow_worker
assert worker._workflow_worker._deadlock_timeout_seconds is None
assert isinstance(worker._workflow_worker._workflow_runner, SandboxedWorkflowRunner)
assert (
"_pydevd_bundle"
in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
)

@contextmanager
def debug_envvar():
os.environ["TEMPORAL_DEBUG"] = "true"
try:
yield
finally:
os.environ.pop("TEMPORAL_DEBUG")

with debug_envvar():
worker = Worker(
client,
workflows=[SimpleWorkflow],
task_queue=f"task-queue-{uuid.uuid4()}",
)
assert worker._workflow_worker
assert worker._workflow_worker._deadlock_timeout_seconds is None
assert isinstance(
worker._workflow_worker._workflow_runner,
SandboxedWorkflowRunner,
)
assert (
"_pydevd_bundle"
in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
)
Loading