Skip to content

Commit 3dadf7f

Browse files
authored
Add xdist parallel test task (#1415)
* Add xdist parallel test task * Stabilize flaky parallel workflow tests * Make poe test parallel by default * Use poe test in CI * Stabilize CI workflow tests * Stabilize time-skipping and process cancel tests * Stabilize quick activity cancellation test
1 parent c355064 commit 3dadf7f

File tree

8 files changed

+79
-34
lines changed

8 files changed

+79
-34
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ jobs:
163163
- run: poe build-develop
164164
- run: poe lint
165165
- run: mkdir junit-xml
166-
- run: poe test -s --junit-xml=junit-xml/latest-deps.xml
166+
- run: poe test -s --junit-xml=junit-xml/latest-deps.xml
167167
timeout-minutes: 15
168168
- name: "Upload junit-xml artifacts"
169169
uses: actions/upload-artifact@v4

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2066,6 +2066,13 @@ To execute tests:
20662066
poe test
20672067
```
20682068

2069+
`poe test` spreads tests across multiple worker processes by default. If you
2070+
need a serial run for debugging, invoke pytest directly:
2071+
2072+
```bash
2073+
uv run pytest
2074+
```
2075+
20692076
This runs against [Temporalite](https://github.com/temporalio/temporalite). To run against the time-skipping test
20702077
server, pass `--workflow-environment time-skipping`. To run against the `default` namespace of an already-running
20712078
server, pass the `host:port` to `--workflow-environment`. Can also use regular pytest arguments. For example, here's how

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ dev = [
7575
"openinference-instrumentation-google-adk>=0.1.8",
7676
"googleapis-common-protos==1.70.0",
7777
"pytest-rerunfailures>=16.1",
78+
"pytest-xdist>=3.6,<4",
7879
"moto[s3,server]>=5",
7980
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
8081
"opentelemetry-semantic-conventions>=0.40b0,<1",
@@ -118,7 +119,7 @@ lint-types = [
118119
{ cmd = "uv run basedpyright" },
119120
]
120121
run-bench = "uv run python scripts/run_bench.py"
121-
test = "uv run pytest"
122+
test = "uv run pytest -n auto --dist=worksteal"
122123

123124

124125
[tool.pytest.ini_options]

tests/conftest.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,16 @@ async def worker(
187187
@pytest.hookimpl(hookwrapper=True, trylast=True)
188188
def pytest_cmdline_main(config): # type: ignore[reportMissingParameterType, reportUnusedParameter]
189189
result = yield
190-
if result.get_result() == 0:
190+
exit_code = result.get_result()
191+
numprocesses = getattr(config.option, "numprocesses", None)
192+
running_with_xdist = hasattr(config, "workerinput") or numprocesses not in (
193+
None,
194+
0,
195+
"0",
196+
)
197+
if exit_code == 0 and not running_with_xdist:
191198
os._exit(0)
192-
return result.get_result()
199+
return exit_code
193200

194201

195202
CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT = 50

tests/nexus/test_workflow_caller.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,10 +1101,13 @@ async def test_async_response(
11011101
return
11021102

11031103
handler_wf_info = await handler_wf_handle.describe()
1104-
assert handler_wf_info.status in [
1104+
expected_statuses = [
11051105
WorkflowExecutionStatus.RUNNING,
11061106
WorkflowExecutionStatus.COMPLETED,
11071107
]
1108+
if request_cancel:
1109+
expected_statuses.append(WorkflowExecutionStatus.CANCELED)
1110+
assert handler_wf_info.status in expected_statuses
11081111
await assert_handler_workflow_has_link_to_caller_workflow(
11091112
caller_wf_handle, handler_wf_handle
11101113
)
@@ -1508,6 +1511,9 @@ async def test_workflow_run_operation_can_execute_workflow_before_starting_backi
15081511
client: Client,
15091512
env: WorkflowEnvironment,
15101513
):
1514+
if env.supports_time_skipping:
1515+
pytest.skip("Nexus tests don't work with time-skipping server")
1516+
15111517
task_queue = str(uuid.uuid4())
15121518
async with Worker(
15131519
client,

tests/worker/test_activity.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ async def test_sync_activity_process_cancel(
639639
picklable_activity_wait_cancel,
640640
cancel_after_ms=100,
641641
wait_for_cancellation=True,
642-
heartbeat_timeout_ms=3000,
642+
heartbeat_timeout_ms=5000,
643643
worker_config={"activity_executor": executor},
644644
shared_state_manager=shared_state_manager,
645645
)

tests/worker/test_workflow.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,14 @@ async def test_workflow_history_info(
331331
# because just a query will have a stale representation of history
332332
# counts, but signal forces a new WFT.
333333
await handle.signal(HistoryInfoWorkflow.bunch_of_events, 1)
334-
new_info = await handle.query(HistoryInfoWorkflow.get_history_info)
335-
assert new_info.history_length > continue_as_new_suggest_history_count
336-
assert new_info.history_size > orig_info.history_size
337-
assert new_info.continue_as_new_suggested
334+
335+
async def history_info_updated() -> None:
336+
new_info = await handle.query(HistoryInfoWorkflow.get_history_info)
337+
assert new_info.history_length > continue_as_new_suggest_history_count
338+
assert new_info.history_size > orig_info.history_size
339+
assert new_info.continue_as_new_suggested
340+
341+
await assert_eventually(history_info_updated)
338342

339343

340344
@workflow.defn
@@ -5317,7 +5321,11 @@ async def any_task_completed(handle: WorkflowHandle) -> bool:
53175321
# because we should have timer-done poll completions every 100ms
53185322
worker.client = other_env.client
53195323
# Now confirm the other workflow has started
5320-
await assert_eq_eventually(True, lambda: any_task_completed(handle2))
5324+
await assert_eq_eventually(
5325+
True,
5326+
lambda: any_task_completed(handle2),
5327+
timeout=timedelta(seconds=30),
5328+
)
53215329
# Terminate both
53225330
await handle1.terminate()
53235331
await handle2.terminate()
@@ -8047,8 +8055,10 @@ async def test_quick_activity_swallows_cancellation(client: Client):
80478055
activities=[short_activity_async],
80488056
activity_executor=concurrent.futures.ThreadPoolExecutor(max_workers=1),
80498057
) as worker:
8050-
for i in range(10):
8051-
wf_duration = random.uniform(5.0, 15.0)
8058+
# Keep this deterministic and bounded. The original randomized 10-iteration
8059+
# version could exceed the per-test timeout on slower CI hosts if
8060+
# cancellation was delayed a few times in a row.
8061+
for i, wf_duration in enumerate((5.0, 7.5, 10.0)):
80528062
wf_handle = await client.start_workflow(
80538063
QuickActivityWorkflow.run,
80548064
id=f"short_activity_wf_id-{i}",
@@ -8537,39 +8547,29 @@ def emit(self, record: logging.LogRecord) -> None:
85378547
async def test_disable_logger_sandbox(
85388548
client: Client,
85398549
):
8540-
logger = workflow.logger.logger
8541-
handler = CustomLogHandler()
8542-
with LogHandler.apply(logger, handler):
8550+
async def execute_with_new_worker(*, disable_sandbox: bool) -> None:
8551+
workflow.logger.unsafe_disable_sandbox(disable_sandbox)
85438552
async with new_worker(
85448553
client,
85458554
DisableLoggerSandbox,
85468555
activities=[],
85478556
) as worker:
8548-
with pytest.raises(WorkflowFailureError):
8549-
await client.execute_workflow(
8550-
DisableLoggerSandbox.run,
8551-
id=f"workflow-{uuid.uuid4()}",
8552-
task_queue=worker.task_queue,
8553-
run_timeout=timedelta(seconds=1),
8554-
retry_policy=RetryPolicy(maximum_attempts=1),
8555-
)
8556-
workflow.logger.unsafe_disable_sandbox()
85578557
await client.execute_workflow(
85588558
DisableLoggerSandbox.run,
85598559
id=f"workflow-{uuid.uuid4()}",
85608560
task_queue=worker.task_queue,
85618561
run_timeout=timedelta(seconds=1),
85628562
retry_policy=RetryPolicy(maximum_attempts=1),
85638563
)
8564-
workflow.logger.unsafe_disable_sandbox(False)
8565-
with pytest.raises(WorkflowFailureError):
8566-
await client.execute_workflow(
8567-
DisableLoggerSandbox.run,
8568-
id=f"workflow-{uuid.uuid4()}",
8569-
task_queue=worker.task_queue,
8570-
run_timeout=timedelta(seconds=1),
8571-
retry_policy=RetryPolicy(maximum_attempts=1),
8572-
)
8564+
8565+
logger = workflow.logger.logger
8566+
handler = CustomLogHandler()
8567+
with LogHandler.apply(logger, handler):
8568+
with pytest.raises(WorkflowFailureError):
8569+
await execute_with_new_worker(disable_sandbox=False)
8570+
await execute_with_new_worker(disable_sandbox=True)
8571+
with pytest.raises(WorkflowFailureError):
8572+
await execute_with_new_worker(disable_sandbox=False)
85738573

85748574

85758575
@workflow.defn

uv.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)