Skip to content

Commit f0e8518

Browse files
authored
Add contrib package for lambda workers (#1408)
1 parent 27686cf commit f0e8518

20 files changed

Lines changed: 1655 additions & 29 deletions

File tree

pyproject.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
3030
pydantic = ["pydantic>=2.0.0,<3"]
3131
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
3232
google-adk = ["google-adk>=1.27.0,<2"]
33+
lambda-worker-otel = [
34+
"opentelemetry-api>=1.11.1,<2",
35+
"opentelemetry-sdk>=1.11.1,<2",
36+
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
37+
"opentelemetry-semantic-conventions>=0.40b0,<1",
38+
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
39+
]
3340
aioboto3 = [
3441
"aioboto3>=10.4.0",
3542
"types-aioboto3[s3]>=10.4.0",
@@ -69,6 +76,9 @@ dev = [
6976
"googleapis-common-protos==1.70.0",
7077
"pytest-rerunfailures>=16.1",
7178
"moto[s3,server]>=5",
79+
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
80+
"opentelemetry-semantic-conventions>=0.40b0,<1",
81+
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
7282
]
7383

7484
[tool.poe.tasks]

temporalio/client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2805,6 +2805,29 @@ async def get_worker_task_reachability(
28052805
)
28062806

28072807

2808+
class ClientConnectConfig(TypedDict, total=False):
2809+
"""TypedDict of keyword arguments for :py:meth:`Client.connect`."""
2810+
2811+
target_host: str
2812+
namespace: str
2813+
api_key: str | None
2814+
data_converter: temporalio.converter.DataConverter
2815+
plugins: Sequence[Plugin]
2816+
interceptors: Sequence[Interceptor]
2817+
default_workflow_query_reject_condition: (
2818+
temporalio.common.QueryRejectCondition | None
2819+
)
2820+
tls: bool | TLSConfig | None
2821+
retry_config: RetryConfig | None
2822+
keep_alive_config: KeepAliveConfig | None
2823+
rpc_metadata: Mapping[str, str | bytes]
2824+
identity: str | None
2825+
lazy: bool
2826+
runtime: temporalio.runtime.Runtime | None
2827+
http_connect_proxy_config: HttpConnectProxyConfig | None
2828+
header_codec_behavior: HeaderCodecBehavior
2829+
2830+
28082831
class ClientConfig(TypedDict, total=False):
28092832
"""TypedDict of config originally passed to :py:meth:`Client`."""
28102833

temporalio/contrib/aws/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""AWS integrations for Temporal SDK."""
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# lambda_worker
2+
3+
A wrapper for running [Temporal](https://temporal.io) workers inside AWS Lambda. A single
4+
`run_worker` call handles the full per-invocation lifecycle: connecting to the Temporal server,
5+
creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully shutting down before
6+
the invocation deadline.
7+
8+
## Quick start
9+
10+
```python
11+
# handler.py
12+
from temporalio.common import WorkerDeploymentVersion
13+
from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker
14+
15+
from my_workflows import MyWorkflow
16+
from my_activities import my_activity
17+
18+
19+
def configure(config: LambdaWorkerConfig) -> None:
20+
config.worker_config["task_queue"] = "my-task-queue"
21+
config.worker_config["workflows"] = [MyWorkflow]
22+
config.worker_config["activities"] = [my_activity]
23+
24+
25+
lambda_handler = run_worker(
26+
WorkerDeploymentVersion(
27+
deployment_name="my-service",
28+
build_id="v1.0",
29+
),
30+
configure,
31+
)
32+
```
33+
34+
## Configuration
35+
36+
Client connection settings (address, namespace, TLS, API key) are loaded
37+
automatically from a TOML config file and/or environment variables via
38+
`temporalio.envconfig`. The config file is resolved in order:
39+
40+
1. `TEMPORAL_CONFIG_FILE` env var, if set.
41+
2. `temporal.toml` in `$LAMBDA_TASK_ROOT` (typically `/var/task`).
42+
3. `temporal.toml` in the current working directory.
43+
44+
The file is optional -- if absent, only environment variables are used.
45+
46+
The configure callback receives a `LambdaWorkerConfig` dataclass with fields
47+
pre-populated with Lambda-appropriate defaults. Override any field directly in
48+
the callback. The `task_queue` key in `worker_config` is pre-populated from the
49+
`TEMPORAL_TASK_QUEUE` environment variable if set.
50+
51+
## Lambda-tuned worker defaults
52+
53+
The package applies conservative concurrency limits suited to Lambda's resource
54+
constraints:
55+
56+
| Setting | Default |
57+
| --- | --- |
58+
| `max_concurrent_activities` | 2 |
59+
| `max_concurrent_workflow_tasks` | 10 |
60+
| `max_concurrent_local_activities` | 2 |
61+
| `max_concurrent_nexus_tasks` | 5 |
62+
| `workflow_task_poller_behavior` | `SimpleMaximum(2)` |
63+
| `activity_task_poller_behavior` | `SimpleMaximum(1)` |
64+
| `nexus_task_poller_behavior` | `SimpleMaximum(1)` |
65+
| `graceful_shutdown_timeout` | 5 seconds |
66+
| `max_cached_workflows` | 100 |
67+
| `disable_eager_activity_execution` | Always `True` |
68+
69+
Worker Deployment Versioning is always enabled.
70+
71+
## Observability
72+
73+
Metrics and tracing are opt-in. The `otel` module provides convenience helpers
74+
for AWS Distro for OpenTelemetry (ADOT):
75+
76+
```python
77+
from temporalio.common import WorkerDeploymentVersion
78+
from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker
79+
from temporalio.contrib.aws.lambda_worker.otel import apply_defaults, OtelOptions
80+
81+
82+
def configure(config: LambdaWorkerConfig) -> None:
83+
config.worker_config["task_queue"] = "my-task-queue"
84+
config.worker_config["workflows"] = [MyWorkflow]
85+
config.worker_config["activities"] = [my_activity]
86+
apply_defaults(config, OtelOptions())
87+
88+
89+
lambda_handler = run_worker(
90+
WorkerDeploymentVersion(
91+
deployment_name="my-service",
92+
build_id="v1.0",
93+
),
94+
configure,
95+
)
96+
```
97+
98+
You can also use `apply_metrics` or `apply_tracing` individually.
99+
100+
If you use OTEL, you can use
101+
[ADOT](https://aws-otel.github.io/docs/getting-started/lambda/lambda-python)
102+
(the AWS Distro For OpenTelemetry) to automatically integrate with AWS
103+
observability functionality. Namely, you will want to add the Lambda layer in
104+
the aforementioned link. We'll handle setting up the SDK for you.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""A wrapper for running Temporal workers inside AWS Lambda.
2+
3+
A single :py:func:`run_worker` call handles the full per-invocation lifecycle: connecting to the
4+
Temporal server, creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully
5+
shutting down before the invocation deadline.
6+
7+
Quick start::
8+
9+
from temporalio.common import WorkerDeploymentVersion
10+
from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker
11+
12+
def configure(config: LambdaWorkerConfig) -> None:
13+
config.worker_config["task_queue"] = "my-task-queue"
14+
config.worker_config["workflows"] = [MyWorkflow]
15+
config.worker_config["activities"] = [my_activity]
16+
17+
lambda_handler = run_worker(
18+
WorkerDeploymentVersion(
19+
deployment_name="my-service",
20+
build_id="v1.0",
21+
),
22+
configure,
23+
)
24+
25+
Configuration
26+
-------------
27+
Client connection settings (address, namespace, TLS, API key) are loaded automatically from a TOML
28+
config file and/or environment variables via :py:mod:`temporalio.envconfig`. The config file is
29+
resolved in order:
30+
31+
1. ``TEMPORAL_CONFIG_FILE`` env var, if set.
32+
2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``).
33+
3. ``temporal.toml`` in the current working directory.
34+
35+
The file is optional -- if absent, only environment variables are used.
36+
37+
The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with fields pre-populated
38+
with Lambda-appropriate defaults. Override any field directly in the callback. The ``task_queue``
39+
key in ``worker_config`` is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if
40+
set.
41+
"""
42+
43+
from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig
44+
from temporalio.contrib.aws.lambda_worker._run_worker import run_worker
45+
46+
__all__ = [
47+
"LambdaWorkerConfig",
48+
"run_worker",
49+
]
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""Configuration for the Lambda worker."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import logging
7+
from collections.abc import Awaitable, Callable
8+
from dataclasses import dataclass, field
9+
from datetime import timedelta
10+
11+
from temporalio.client import ClientConnectConfig
12+
from temporalio.worker import WorkerConfig
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
@dataclass
18+
class LambdaWorkerConfig:
19+
"""Passed to the configure callback of :py:func:`run_worker`.
20+
21+
Fields are pre-populated with Lambda-appropriate defaults before the configure callback is
22+
invoked; the callback may read and override any of them.
23+
24+
Use ``worker_config`` to set task queue, register workflows/activities, and tune worker options.
25+
The ``task_queue`` key is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if
26+
set.
27+
28+
Attributes:
29+
client_connect_config: Keyword arguments that will be passed to
30+
:py:meth:`temporalio.client.Client.connect`. Pre-populated from the
31+
config file / environment variables via envconfig, with Lambda
32+
defaults applied.
33+
worker_config: Keyword arguments that will be passed to the
34+
:py:class:`temporalio.worker.Worker` constructor (the ``client``
35+
key is managed internally). Pre-populated with Lambda-appropriate
36+
defaults (low concurrency, eager activities disabled) and
37+
``task_queue`` from ``TEMPORAL_TASK_QUEUE`` if set.
38+
shutdown_deadline_buffer: How long before the Lambda invocation
39+
deadline the worker begins its shutdown sequence (worker drain +
40+
shutdown hooks). Pre-populated to
41+
``graceful_shutdown_timeout + 2s``. If you change
42+
``graceful_shutdown_timeout`` in ``worker_config``, adjust this
43+
accordingly.
44+
shutdown_hooks: Functions called at the end of each Lambda invocation,
45+
after the worker has stopped. Run in list order. Each may be sync
46+
or async. Use this to flush telemetry providers or release other
47+
per-process resources.
48+
"""
49+
50+
client_connect_config: ClientConnectConfig = field(
51+
default_factory=ClientConnectConfig
52+
)
53+
worker_config: WorkerConfig = field(default_factory=WorkerConfig)
54+
shutdown_deadline_buffer: timedelta = field(
55+
default_factory=lambda: timedelta(seconds=7)
56+
)
57+
shutdown_hooks: list[Callable[[], Awaitable[None] | None]] = field(
58+
default_factory=list
59+
)
60+
61+
62+
async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction]
63+
config: LambdaWorkerConfig,
64+
) -> None:
65+
"""Run all registered shutdown hooks in order, logging errors."""
66+
for fn in config.shutdown_hooks:
67+
try:
68+
result = fn()
69+
if asyncio.iscoroutine(result):
70+
await result
71+
except Exception as e:
72+
logger.error(f"shutdown hook error: {e}")
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Lambda-tuned defaults for Temporal worker and client configuration."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
from collections.abc import Callable
7+
from datetime import timedelta
8+
from pathlib import Path
9+
10+
from temporalio.worker import PollerBehaviorSimpleMaximum, WorkerConfig
11+
12+
# ---- Lambda-tuned worker defaults ----
13+
# Conservative concurrency limits suited to Lambda's resource constraints.
14+
15+
DEFAULT_MAX_CONCURRENT_ACTIVITIES: int = 2
16+
DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS: int = 10
17+
DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES: int = 2
18+
DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5
19+
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5)
20+
DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2)
21+
DEFAULT_MAX_CACHED_WORKFLOWS: int = 30
22+
23+
DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2)
24+
DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1)
25+
DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1)
26+
27+
# ---- Environment variable names ----
28+
ENV_TASK_QUEUE = "TEMPORAL_TASK_QUEUE"
29+
ENV_LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT"
30+
ENV_CONFIG_FILE = "TEMPORAL_CONFIG_FILE"
31+
DEFAULT_CONFIG_FILE = "temporal.toml"
32+
33+
34+
def apply_lambda_worker_defaults(config: WorkerConfig) -> None:
35+
"""Apply Lambda-appropriate defaults to worker config.
36+
37+
Only sets values that have not already been set (i.e. are absent from *config*).
38+
``disable_eager_activity_execution`` is always set to ``True``.
39+
"""
40+
config.setdefault("max_concurrent_activities", DEFAULT_MAX_CONCURRENT_ACTIVITIES)
41+
config.setdefault(
42+
"max_concurrent_workflow_tasks", DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS
43+
)
44+
config.setdefault(
45+
"max_concurrent_local_activities", DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES
46+
)
47+
config.setdefault("max_concurrent_nexus_tasks", DEFAULT_MAX_CONCURRENT_NEXUS_TASKS)
48+
config.setdefault("graceful_shutdown_timeout", DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT)
49+
config.setdefault("max_cached_workflows", DEFAULT_MAX_CACHED_WORKFLOWS)
50+
config.setdefault(
51+
"workflow_task_poller_behavior", DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR
52+
)
53+
config.setdefault(
54+
"activity_task_poller_behavior", DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR
55+
)
56+
config.setdefault("nexus_task_poller_behavior", DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR)
57+
# Always disable eager activities in Lambda.
58+
config["disable_eager_activity_execution"] = True
59+
60+
61+
def build_lambda_identity(request_id: str, function_arn: str) -> str:
62+
"""Build a worker identity string from the Lambda invocation context.
63+
64+
Format: ``<request_id>@<function_arn>``.
65+
"""
66+
return f"{request_id or 'unknown'}@{function_arn or 'unknown'}"
67+
68+
69+
def lambda_default_config_file_path(
70+
getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment]
71+
) -> Path:
72+
"""Return the config file path for a Lambda environment.
73+
74+
Resolution order:
75+
76+
1. ``TEMPORAL_CONFIG_FILE`` env var, if set.
77+
2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``).
78+
3. ``temporal.toml`` in the current working directory.
79+
"""
80+
config_file = getenv(ENV_CONFIG_FILE)
81+
if config_file:
82+
return Path(config_file)
83+
root = getenv(ENV_LAMBDA_TASK_ROOT) or "."
84+
return Path(root) / DEFAULT_CONFIG_FILE

0 commit comments

Comments
 (0)