Skip to content

Commit 7b33c4c

Browse files
desertaxleclaude
andcommitted
Add arun_deployment and replace sync_compatible with async_dispatch
This change follows the intent of issue #15008 to replace implicit sync/async conversion with explicit, type-safe alternatives. Changes: - Add `arun_deployment` as an explicit async function for running deployments - Replace `@sync_compatible` with `@async_dispatch` on `run_deployment` - `run_deployment` now dispatches to `arun_deployment` in async context - Sync context uses `SyncPrefectClient` directly (no event loop magic) - Export `arun_deployment` from `prefect.deployments` - Add comprehensive tests for both sync and async behavior The `run_deployment.aio` attribute is preserved for backward compatibility. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9864bf2 commit 7b33c4c

File tree

3 files changed

+564
-10
lines changed

3 files changed

+564
-10
lines changed

src/prefect/deployments/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33

44

55
if TYPE_CHECKING:
6-
from .flow_runs import run_deployment
6+
from .flow_runs import arun_deployment, run_deployment
77
from .base import initialize_project
88
from .runner import deploy
99

1010
_public_api: dict[str, tuple[str, str]] = {
1111
"initialize_project": (__spec__.parent, ".base"),
12+
"arun_deployment": (__spec__.parent, ".flow_runs"),
1213
"run_deployment": (__spec__.parent, ".flow_runs"),
1314
"deploy": (__spec__.parent, ".runner"),
1415
}
1516

1617
# Declare API for type-checkers
17-
__all__ = ["initialize_project", "deploy", "run_deployment"]
18+
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]
1819

1920

2021
def __getattr__(attr_name: str) -> object:

src/prefect/deployments/flow_runs.py

Lines changed: 226 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@
66
from opentelemetry import trace
77

88
import prefect
9+
from prefect._internal.compatibility.async_dispatch import async_dispatch
910
from prefect._result_records import ResultRecordMetadata
11+
from prefect.client.orchestration import get_client
1012
from prefect.client.schemas import FlowRun, TaskRunResult
11-
from prefect.client.utilities import inject_client
13+
from prefect.client.utilities import get_or_create_client
1214
from prefect.context import FlowRunContext, TaskRunContext
1315
from prefect.logging import get_logger
1416
from prefect.states import Pending, Scheduled
1517
from prefect.tasks import Task
1618
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
1719
from prefect.types._datetime import now
18-
from prefect.utilities.asyncutils import sync_compatible
1920
from prefect.utilities.slugify import slugify
2021

2122

@@ -45,11 +46,8 @@ def _is_instrumentation_enabled() -> bool:
4546
logger: "logging.Logger" = get_logger(__name__)
4647

4748

48-
@sync_compatible
49-
@inject_client
50-
async def run_deployment(
49+
async def arun_deployment(
5150
name: Union[str, UUID],
52-
client: Optional["PrefectClient"] = None,
5351
parameters: Optional[dict[str, Any]] = None,
5452
scheduled_time: Optional[datetime] = None,
5553
flow_run_name: Optional[str] = None,
@@ -60,9 +58,10 @@ async def run_deployment(
6058
work_queue_name: Optional[str] = None,
6159
as_subflow: Optional[bool] = True,
6260
job_variables: Optional[dict[str, Any]] = None,
61+
client: Optional["PrefectClient"] = None,
6362
) -> "FlowRun":
6463
"""
65-
Create a flow run for a deployment and return it after completion or a timeout.
64+
Asynchronously create a flow run for a deployment and return it after completion or a timeout.
6665
6766
By default, this function blocks until the flow run finishes executing.
6867
Specify a timeout (in seconds) to wait for the flow run to execute before
@@ -100,6 +99,19 @@ async def run_deployment(
10099
job_variables: A dictionary of dot delimited infrastructure overrides that
101100
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
102101
`namespace='prefect'`
102+
client: An optional PrefectClient to use for API requests.
103+
104+
Example:
105+
```python
106+
import asyncio
107+
from prefect.deployments import arun_deployment
108+
109+
async def main():
110+
flow_run = await arun_deployment("my-flow/my-deployment")
111+
print(flow_run.state)
112+
113+
asyncio.run(main())
114+
```
103115
"""
104116
if timeout is not None and timeout < 0:
105117
raise ValueError("`timeout` cannot be negative")
@@ -119,6 +131,8 @@ async def run_deployment(
119131
except ValueError:
120132
pass
121133

134+
client, _ = get_or_create_client(client)
135+
122136
if deployment_id:
123137
deployment = await client.read_deployment(deployment_id=deployment_id)
124138
else:
@@ -222,3 +236,208 @@ async def run_deployment(
222236
await anyio.sleep(poll_interval)
223237

224238
return flow_run
239+
240+
241+
@async_dispatch(arun_deployment)
242+
def run_deployment(
243+
name: Union[str, UUID],
244+
parameters: Optional[dict[str, Any]] = None,
245+
scheduled_time: Optional[datetime] = None,
246+
flow_run_name: Optional[str] = None,
247+
timeout: Optional[float] = None,
248+
poll_interval: Optional[float] = 5,
249+
tags: Optional[Iterable[str]] = None,
250+
idempotency_key: Optional[str] = None,
251+
work_queue_name: Optional[str] = None,
252+
as_subflow: Optional[bool] = True,
253+
job_variables: Optional[dict[str, Any]] = None,
254+
client: Optional["PrefectClient"] = None,
255+
) -> "FlowRun":
256+
"""
257+
Create a flow run for a deployment and return it after completion or a timeout.
258+
259+
This function will dispatch to `arun_deployment` when called from an async context.
260+
261+
By default, this function blocks until the flow run finishes executing.
262+
Specify a timeout (in seconds) to wait for the flow run to execute before
263+
returning flow run metadata. To return immediately, without waiting for the
264+
flow run to execute, set `timeout=0`.
265+
266+
Note that if you specify a timeout, this function will return the flow run
267+
metadata whether or not the flow run finished executing.
268+
269+
If called within a flow or task, the flow run this function creates will
270+
be linked to the current flow run as a subflow. Disable this behavior by
271+
passing `as_subflow=False`.
272+
273+
Args:
274+
name: The deployment id or deployment name in the form:
275+
`"flow name/deployment name"`
276+
parameters: Parameter overrides for this flow run. Merged with the deployment
277+
defaults.
278+
scheduled_time: The time to schedule the flow run for, defaults to scheduling
279+
the flow run to start now.
280+
flow_run_name: A name for the created flow run
281+
timeout: The amount of time to wait (in seconds) for the flow run to
282+
complete before returning. Setting `timeout` to 0 will return the flow
283+
run metadata immediately. Setting `timeout` to None will allow this
284+
function to poll indefinitely. Defaults to None.
285+
poll_interval: The number of seconds between polls
286+
tags: A list of tags to associate with this flow run; tags can be used in
287+
automations and for organizational purposes.
288+
idempotency_key: A unique value to recognize retries of the same run, and
289+
prevent creating multiple flow runs.
290+
work_queue_name: The name of a work queue to use for this run. Defaults to
291+
the default work queue for the deployment.
292+
as_subflow: Whether to link the flow run as a subflow of the current
293+
flow or task run.
294+
job_variables: A dictionary of dot delimited infrastructure overrides that
295+
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
296+
`namespace='prefect'`
297+
client: An optional PrefectClient to use for API requests. This is ignored
298+
when called from a synchronous context.
299+
300+
Example:
301+
```python
302+
from prefect.deployments import run_deployment
303+
304+
# Sync context
305+
flow_run = run_deployment("my-flow/my-deployment")
306+
print(flow_run.state)
307+
308+
# Async context (will dispatch to arun_deployment)
309+
async def main():
310+
flow_run = await run_deployment("my-flow/my-deployment")
311+
print(flow_run.state)
312+
```
313+
"""
314+
if timeout is not None and timeout < 0:
315+
raise ValueError("`timeout` cannot be negative")
316+
317+
if scheduled_time is None:
318+
scheduled_time = now("UTC")
319+
320+
parameters = parameters or {}
321+
322+
deployment_id = None
323+
324+
if isinstance(name, UUID):
325+
deployment_id = name
326+
else:
327+
try:
328+
deployment_id = UUID(name)
329+
except ValueError:
330+
pass
331+
332+
with get_client(sync_client=True) as sync_client:
333+
if deployment_id:
334+
deployment = sync_client.read_deployment(deployment_id=deployment_id)
335+
else:
336+
deployment = sync_client.read_deployment_by_name(name)
337+
338+
flow_run_ctx = FlowRunContext.get()
339+
task_run_ctx = TaskRunContext.get()
340+
if as_subflow and (flow_run_ctx or task_run_ctx):
341+
# TODO: this logic can likely be simplified by using `Task.create_run`
342+
from prefect.utilities._engine import dynamic_key_for_task_run
343+
from prefect.utilities.engine import collect_task_run_inputs_sync
344+
345+
# This was called from a flow. Link the flow run as a subflow.
346+
task_inputs = {
347+
k: collect_task_run_inputs_sync(v) for k, v in parameters.items()
348+
}
349+
350+
# Track parent task if this is being called from within a task
351+
# This enables the execution graph to properly display the deployment
352+
# flow run as nested under the calling task
353+
if task_run_ctx:
354+
# The task run is only considered a parent if it is in the same
355+
# flow run (otherwise the child is in a subflow, so the subflow
356+
# serves as the parent) or if there is no flow run
357+
if not flow_run_ctx or (
358+
task_run_ctx.task_run.flow_run_id
359+
== getattr(flow_run_ctx.flow_run, "id", None)
360+
):
361+
task_inputs["__parents__"] = [
362+
TaskRunResult(id=task_run_ctx.task_run.id)
363+
]
364+
365+
if deployment_id:
366+
flow = sync_client.read_flow(deployment.flow_id)
367+
deployment_name = f"{flow.name}/{deployment.name}"
368+
else:
369+
deployment_name = name
370+
371+
# Generate a task in the parent flow run to represent the result of the subflow
372+
dummy_task = Task(
373+
name=deployment_name,
374+
fn=lambda: None,
375+
version=deployment.version,
376+
)
377+
# Override the default task key to include the deployment name
378+
dummy_task.task_key = (
379+
f"{__name__}.run_deployment.{slugify(deployment_name)}"
380+
)
381+
flow_run_id = (
382+
flow_run_ctx.flow_run.id
383+
if flow_run_ctx
384+
else task_run_ctx.task_run.flow_run_id
385+
)
386+
dynamic_key = (
387+
dynamic_key_for_task_run(flow_run_ctx, dummy_task)
388+
if flow_run_ctx
389+
else task_run_ctx.task_run.dynamic_key
390+
)
391+
parent_task_run = sync_client.create_task_run(
392+
task=dummy_task,
393+
flow_run_id=flow_run_id,
394+
dynamic_key=dynamic_key,
395+
task_inputs=task_inputs,
396+
state=Pending(),
397+
)
398+
parent_task_run_id = parent_task_run.id
399+
else:
400+
parent_task_run_id = None
401+
402+
if flow_run_ctx and flow_run_ctx.flow_run:
403+
traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY)
404+
elif _is_instrumentation_enabled():
405+
traceparent = RunTelemetry.traceparent_from_span(
406+
span=trace.get_current_span()
407+
)
408+
else:
409+
traceparent = None
410+
411+
trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}
412+
413+
flow_run = sync_client.create_flow_run_from_deployment(
414+
deployment.id,
415+
parameters=parameters,
416+
state=Scheduled(scheduled_time=scheduled_time),
417+
name=flow_run_name,
418+
tags=tags,
419+
idempotency_key=idempotency_key,
420+
parent_task_run_id=parent_task_run_id,
421+
work_queue_name=work_queue_name,
422+
job_variables=job_variables,
423+
labels=trace_labels,
424+
)
425+
426+
flow_run_id = flow_run.id
427+
428+
if timeout == 0:
429+
return flow_run
430+
431+
import time
432+
433+
start_time = time.monotonic()
434+
while True:
435+
flow_run = sync_client.read_flow_run(flow_run_id)
436+
flow_state = flow_run.state
437+
if flow_state and flow_state.is_final():
438+
return flow_run
439+
if timeout is not None and (time.monotonic() - start_time) >= timeout:
440+
return flow_run
441+
time.sleep(poll_interval)
442+
443+
return flow_run

0 commit comments

Comments
 (0)