These examples mirror examples/workflow/ but author orchestrators with async def using the
async workflow APIs. Activities can be either sync or async functions.
How to run:
-
Install Dapr CLI:
brew install dapr/tap/dapr-cliorchoco install dapr-cli -
Initialize Dapr:
dapr init -
Install requirements:
cd examples/workflow-async python -m venv .venv source .venv/bin/activate pip install -r requirements.txt
or better yet with faster
uv:uv venv .venv source .venv/bin/activate uv pip install -r requirements.txt -
Run any example with dapr:
dapr run --app-id wf_async_symple -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python simple.pydapr run --app-id wf_task_chain -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python task_chaining.pydapr run --app-id wf_async_child -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python child_workflow.pydapr run --app-id wf_async_fafi -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in.pydapr run --app-id wf_async_gather -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in_with_gather.pydapr run --app-id wf_async_approval -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python human_approval.pydapr run --app-id wf_ctx_interceptors -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python context_interceptors_example.pydapr run --app-id wf_async_http -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python async_http_activity.py
- simple.py: Comprehensive example showing activities, child workflows, retry policies, and external events
- task_chaining.py: Sequential activity calls where each result feeds into the next
- child_workflow.py: Parent workflow calling a child workflow
- fan_out_fan_in.py: Parallel activity execution pattern
- fan_out_fan_in_with_gather.py: Parallel execution using
ctx.when_all() - human_approval.py: Workflow waiting for external event to proceed
- context_interceptors_example.py: Context propagation using interceptors (tenant, request ID, etc.)
- async_http_activity.py: Async activities performing I/O-bound operations (HTTP requests with aiohttp)
Notes:
- Orchestrators use
await ctx.activity(...),await ctx.create_timer(...),await ctx.when_all/when_any(...), etc. - No event loop is started manually; the Durable Task worker drives the async orchestrators.
- You can also launch instances using
DaprWorkflowClientas in the non-async examples. - The interceptors example demonstrates how to propagate context (tenant, request ID) across workflow and activity boundaries using the wrapper pattern to avoid contextvar loss.
Activities can be either synchronous or asynchronous functions. Async activities are useful for I/O-bound operations like HTTP requests, database queries, or file operations:
from dapr.ext.workflow import WorkflowActivityContext
# Synchronous activity
@wfr.activity
def sync_activity(ctx: WorkflowActivityContext, data: str) -> str:
return data.upper()
# Asynchronous activity
@wfr.activity
async def async_activity(ctx: WorkflowActivityContext, data: str) -> str:
# Perform async I/O operations
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/{data}") as response:
result = await response.json()
return resultBoth sync and async activities are registered the same way using the @wfr.activity decorator. Orchestrators call them identically regardless of whether they're sync or async - the SDK handles the execution automatically.
When to use async activities:
- HTTP requests or API calls
- Database queries
- File I/O operations
- Any I/O-bound work that benefits from async/await
See async_http_activity.py for a complete example.