Skip to content

Race condition in Runner.__aenter__ causes CancelScope errors with concurrent flow runs #20117

@zzstoatzz

Description

@zzstoatzz

Summary

While testing the fix for #20108 (queue-scoped workers missing force-rescheduled flow runs), we observed intermittent RuntimeError("Each CancelScope may only be used for a single 'with' block") crashes when ProcessWorker executes multiple flow runs concurrently.

Bug Description

There's a race condition in Runner.__aenter__ where multiple concurrent tasks can attempt to enter the same _runs_task_group CancelScope.

Location: src/prefect/runner/runner.py:1582-1584

if not hasattr(self, "_runs_task_group") or not self._runs_task_group:
    self._runs_task_group: anyio.abc.TaskGroup = anyio.create_task_group()
await self._exit_stack.enter_async_context(self._runs_task_group)

Race scenario:

  1. Task A: passes the hasattr check, creates task group
  2. Task B: passes the check (task group now exists), skips creation
  3. Task A: enters the task group context via exit_stack
  4. Task B: tries to enter THE SAME task group context → ERROR

This manifests as:

RuntimeError("Each CancelScope may only be used for a single 'with' block")

We also observed related race conditions:

  • RuntimeError: Cannot open a client instance more than once. (httpx client)
  • AttributeError: 'TaskGroup' object has no attribute '_exceptions' (TaskGroup cleanup)

Git History Analysis

The problematic code pattern has two key commits:

  1. abea91cf1b5 (July 2024) - PR syncify serve #14563 "syncify serve"

    • Introduced the lazy initialization pattern with hasattr check to allow Runner re-entry
    • Original code used await self._runs_task_group.__aenter__() directly
  2. 378fc4f6b72 (May 2025) - PR Refactor Runner to receive cancellation signals via websocket #17973 "Refactor Runner to receive cancellation signals via websocket"

    • Changed to use AsyncExitStack for context management
    • Changed await self._runs_task_group.__aenter__() to await self._exit_stack.enter_async_context(self._runs_task_group)

The combination of lazy task group creation (allowing reuse) + entering via exit_stack creates the race window.

Potential Fix

The _runs_task_group should either:

  1. Be created fresh each time in __aenter__ (not reused), or
  2. Be protected by a lock to prevent concurrent entry, or
  3. Use a different pattern that doesn't rely on the check-then-enter sequence

Similar logic applies to _loops_task_group at lines 1586-1587.

Reproduction

The bug is probabilistic but reproduces reliably with high concurrency. On our test runs, ~3% of flow runs crashed with the CancelScope error.

Reproduction Script
"""
Repro for CancelScope race condition in Runner via ProcessWorker.

BUG SUMMARY:
When multiple flow runs are submitted concurrently to a ProcessWorker,
there's a race condition in Runner.__aenter__ where multiple tasks can try to
enter the same _runs_task_group CancelScope.

ERROR: RuntimeError("Each CancelScope may only be used for a single 'with' block")

LOCATION: src/prefect/runner/runner.py:1582-1584

Run with: uv run python repros/cancel_scope_race.py

Requires a local Prefect server running (prefect server start) or staging.
"""

import asyncio
import sys
import logging
from pathlib import Path
from uuid import UUID

# Suppress noisy logs before imports
logging.getLogger("prefect").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("websockets").setLevel(logging.CRITICAL)

from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
)
from prefect.workers.process import ProcessWorker


@flow
def dummy_flow(x: int = 0):
    """Simple flow that completes quickly."""
    return x


async def deploy_flow(work_pool: str) -> UUID:
    """Deploy the flow and return deployment ID."""
    runnable = await dummy_flow.from_source(
        source=str(Path(__file__).parent),
        entrypoint="cancel_scope_race.py:dummy_flow",
    )
    deployment_id = await runnable.deploy(
        name="cancel-scope-repro",
        work_pool_name=work_pool,
        build=False,
        push=False,
    )
    return deployment_id


async def create_flow_runs(deployment_id: UUID, count: int) -> list[UUID]:
    """Create scheduled flow runs from deployment."""
    async with get_client() as client:
        tasks = [
            client.create_flow_run_from_deployment(
                deployment_id=deployment_id,
                parameters={"x": i},
            )
            for i in range(count)
        ]
        flow_runs = await asyncio.gather(*tasks)
        return [fr.id for fr in flow_runs]


async def check_for_cancel_scope_crashes() -> int:
    """Check for flow runs that crashed with CancelScope error."""
    async with get_client() as client:
        crashed_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=["CRASHED"])
                )
            ),
            limit=100,
        )
        count = 0
        for run in crashed_runs:
            if run.state and run.state.message and "CancelScope" in run.state.message:
                count += 1
                print(f"    FOUND: {run.id} - {run.state.message[:80]}...")
        return count


async def run_test(work_pool: str, deployment_id: UUID, num_runs: int) -> tuple[int, int]:
    """Run a single test iteration. Returns (submitted, cancel_scope_errors)."""

    print(f"  Creating {num_runs} flow runs...")
    flow_run_ids = await create_flow_runs(deployment_id, num_runs)
    print(f"  Created {len(flow_run_ids)} flow runs")

    # Use ProcessWorker with high concurrency limit
    worker = ProcessWorker(
        work_pool_name=work_pool,
        limit=50,
    )

    cancel_scope_errors = 0
    submitted = 0

    try:
        async with worker:
            print(f"  Getting and submitting flow runs...")
            results = await worker.get_and_submit_flow_runs()
            submitted = len(results)
            print(f"  Submitted {submitted} flow runs")

            # Wait for runs to complete/fail
            await asyncio.sleep(5)

    except Exception as e:
        if "CancelScope" in str(e):
            print(f"  CAUGHT CancelScope error: {e}")
            cancel_scope_errors += 1
        else:
            print(f"  Error: {type(e).__name__}: {str(e)[:100]}")

    return submitted, cancel_scope_errors


async def main():
    work_pool = "test-cancel-scope"
    num_runs = 20
    num_iterations = 3

    print("CancelScope Race Condition Repro")
    print("=" * 50)
    print(f"Work pool: {work_pool}")
    print(f"Concurrent flow runs: {num_runs}")
    print()

    # Ensure work pool exists
    async with get_client() as client:
        try:
            await client.read_work_pool(work_pool)
            print(f"Using existing work pool: {work_pool}")
        except Exception:
            from prefect.client.schemas.actions import WorkPoolCreate
            await client.create_work_pool(
                work_pool=WorkPoolCreate(name=work_pool, type="process")
            )
            print(f"Created work pool: {work_pool}")

    # Deploy the flow
    print("Deploying flow...")
    deployment_id = await deploy_flow(work_pool)
    print(f"Deployment ID: {deployment_id}")
    print()

    total_submitted = 0
    total_errors = 0

    for i in range(num_iterations):
        print(f"--- Iteration {i + 1}/{num_iterations} ---")
        submitted, errors = await run_test(work_pool, deployment_id, num_runs)
        total_submitted += submitted
        total_errors += errors

        # Check for crashed runs with CancelScope
        crashed_with_cancel = await check_for_cancel_scope_crashes()
        total_errors += crashed_with_cancel

        print()
        await asyncio.sleep(1)

    print("=" * 50)
    print(f"RESULTS:")
    print(f"  Total submitted: {total_submitted}")
    print(f"  CancelScope errors: {total_errors}")

    if total_errors > 0:
        print("\nBUG CONFIRMED: CancelScope race condition exists.")
        print("See src/prefect/runner/runner.py:1582-1584")
        return 1
    else:
        print("\nNo CancelScope errors this run.")
        print("Race is probabilistic - try increasing num_runs or iterations.")
        return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)

Environment

  • Prefect version: 3.x (main branch)
  • Python: 3.12
  • OS: macOS

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions