Skip to content

Commit 05971a8

Browse files
authored
Experimental: AWS S3 storage driver (#1388)
1 parent e9ac9a3 commit 05971a8

File tree

14 files changed

+2635
-29
lines changed

14 files changed

+2635
-29
lines changed

README.md

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ informal introduction to the features and their implementation.
5656
- [Custom Type Data Conversion](#custom-type-data-conversion)
5757
- [External Storage](#external-storage)
5858
- [Driver Selection](#driver-selection)
59+
- [Built-in Drivers](#built-in-drivers)
5960
- [Custom Drivers](#custom-drivers)
6061
- [Workers](#workers)
6162
- [Workflows](#workflows)
@@ -467,25 +468,36 @@ External storage allows large payloads to be offloaded to an external storage se
467468

468469
External storage is configured via the `external_storage` parameter on `DataConverter`. It should be configured on the `Client` both for clients of your workflow as well as on the worker -- anywhere large payloads may be uploaded or downloaded.
469470

470-
A `StorageDriver` handles uploading and downloading payloads. Temporal provides built-in drivers for common storage solutions, or you may customize one. Here's an example using our provided `InMemoryTestDriver`.
471+
A `StorageDriver` handles uploading and downloading payloads. Temporal provides [built-in drivers](#built-in-drivers) for common storage solutions, or you may implement a [custom driver](#custom-drivers). Here's an example using the built-in `S3StorageDriver` with the SDK's `aioboto3` client:
471472

472473
```python
474+
import aioboto3
473475
import dataclasses
474-
from temporalio.client import Client
476+
from temporalio.client import Client, ClientConfig
477+
from temporalio.contrib.aws.s3driver import S3StorageDriver
478+
from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client
475479
from temporalio.converter import DataConverter
476480
from temporalio.converter import ExternalStorage
477481

478-
driver = InMemoryTestDriver()
482+
client_config = ClientConfig.load_client_connect_config()
479483

480-
client = await Client.connect(
481-
"localhost:7233",
482-
data_converter=dataclasses.replace(
483-
DataConverter.default,
484-
external_storage=ExternalStorage(drivers=[driver]),
485-
),
486-
)
484+
session = aioboto3.Session()
485+
async with session.client("s3") as s3_client:
486+
driver = S3StorageDriver(
487+
client=new_aioboto3_client(s3_client),
488+
bucket="my-bucket",
489+
)
490+
client = await Client.connect(
491+
**client_config,
492+
data_converter=dataclasses.replace(
493+
DataConverter.default,
494+
external_storage=ExternalStorage(drivers=[driver]),
495+
),
496+
)
487497
```
488498

499+
See the [S3 driver README](temporalio/contrib/aws/s3driver/) for further details.
500+
489501
Some things to note about external storage:
490502

491503
* Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
@@ -540,6 +552,10 @@ Some things to note about driver selection:
540552
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
541553
* The driver instance returned by the selector must be one of the instances registered in `ExternalStorage.drivers`. If it is not, an error is raised.
542554

555+
###### Built-in Drivers
556+
557+
- **[S3 Storage Driver](temporalio/contrib/aws/s3driver/)**: ⚠️ **Experimental** ⚠️ Amazon S3 driver. Ships with an aioboto3 client, or bring your own by subclassing `S3StorageDriverClient`.
558+
543559
###### Custom Drivers
544560

545561
Implement `temporalio.converter.StorageDriver` to integrate with an external storage system:

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
3030
pydantic = ["pydantic>=2.0.0,<3"]
3131
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
3232
google-adk = ["google-adk>=1.27.0,<2"]
33+
aioboto3 = [
34+
"aioboto3>=10.4.0",
35+
"types-aioboto3[s3]>=10.4.0",
36+
]
3337

3438
[project.urls]
3539
Homepage = "https://github.com/temporalio/sdk-python"
@@ -64,6 +68,7 @@ dev = [
6468
"openinference-instrumentation-google-adk>=0.1.8",
6569
"googleapis-common-protos==1.70.0",
6670
"pytest-rerunfailures>=16.1",
71+
"moto[s3,server]>=5",
6772
]
6873

6974
[tool.poe.tasks]
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# AWS Integration for Temporal Python SDK
2+
3+
> ⚠️ **This package is currently at an experimental release stage.** ⚠️
4+
5+
This package provides AWS integrations for the Temporal Python SDK, including an Amazon S3 driver for [external storage](../../../README.md#external-storage).
6+
7+
## S3 Driver
8+
9+
`S3StorageDriver` stores and retrieves Temporal payloads in Amazon S3. It accepts any `S3StorageDriverClient` implementation and a `bucket` — either a static name or a callable for dynamic per-payload selection.
10+
11+
### Using the built-in aioboto3 client
12+
13+
The SDK ships with an [`aioboto3`](https://github.com/terrycain/aioboto3)-based client. Install the extra to pull in its dependencies:
14+
15+
python -m pip install "temporalio[aioboto3]"
16+
17+
```python
18+
import aioboto3
19+
import dataclasses
20+
from temporalio.client import Client
21+
from temporalio.contrib.aws.s3driver import S3StorageDriver
22+
from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client
23+
from temporalio.converter import DataConverter, ExternalStorage
24+
25+
session = aioboto3.Session()
26+
# Credentials and region are resolved automatically from the standard AWS credential
27+
# chain e.g. environment variables, ~/.aws/config, IAM instance profile, and so on.
28+
async with session.client("s3") as s3_client:
29+
driver = S3StorageDriver(
30+
client=new_aioboto3_client(s3_client),
31+
bucket="my-temporal-payloads",
32+
)
33+
34+
client = await Client.connect(
35+
"localhost:7233",
36+
data_converter=dataclasses.replace(
37+
DataConverter.default,
38+
external_storage=ExternalStorage(drivers=[driver]),
39+
),
40+
)
41+
```
42+
43+
### Custom S3 client implementations
44+
45+
To use a different S3 library, subclass `S3StorageDriverClient` and implement `put_object`, `get_object`, and `object_exists`. The ABC has no external dependencies, so no AWS packages are required to import it.
46+
47+
```python
48+
from temporalio.contrib.aws.s3driver import S3StorageDriverClient
49+
50+
class MyS3Client(S3StorageDriverClient):
51+
async def put_object(self, *, bucket: str, key: str, data: bytes) -> None: ...
52+
async def object_exists(self, *, bucket: str, key: str) -> bool: ...
53+
async def get_object(self, *, bucket: str, key: str) -> bytes: ...
54+
55+
driver = S3StorageDriver(client=MyS3Client(), bucket="my-temporal-payloads")
56+
```
57+
58+
### Key structure
59+
60+
Payloads are stored under content-addressable keys derived from a SHA-256 hash of the serialized payload bytes, segmented by namespace and workflow/activity identifiers when serialization context is available, e.g.:
61+
62+
v0/ns/my-namespace/wfi/my-workflow-id/d/sha256/<hash>
63+
64+
### Notes
65+
66+
* Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them.
67+
* The target S3 bucket must already exist; the driver will not create it.
68+
* Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers.
69+
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `None` to offload every payload regardless of size.
70+
* `S3StorageDriver.max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB.
71+
* Override `S3StorageDriver.driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list.
72+
73+
### Dynamic Bucket Selection
74+
75+
To select the S3 bucket per payload, pass a callable as `bucket`:
76+
77+
```python
78+
from temporalio.contrib.aws.s3driver import S3StorageDriver
79+
from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client
80+
81+
driver = S3StorageDriver(
82+
client=new_aioboto3_client(s3_client),
83+
bucket=lambda context, payload: (
84+
"large-payloads" if payload.ByteSize() > 10 * 1024 * 1024 else "small-payloads"
85+
),
86+
)
87+
```
88+
89+
### Required IAM permissions
90+
91+
The AWS credentials used by your S3 client must have the following S3 permissions on the target bucket and its objects:
92+
93+
```json
94+
{
95+
"Effect": "Allow",
96+
"Action": [
97+
"s3:PutObject",
98+
"s3:GetObject"
99+
],
100+
"Resource": "arn:aws:s3:::my-temporal-payloads/*"
101+
}
102+
```
103+
104+
`s3:PutObject` is required by components that store payloads (typically the Temporal client and worker sending workflow/activity inputs), and `s3:GetObject` is required by components that retrieve them (typically workers and clients reading results). Components that only retrieve payloads do not need `s3:PutObject`, and vice versa.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
"""Amazon S3 storage driver for Temporal external storage.
2+
3+
.. warning::
4+
This API is experimental.
5+
"""
6+
7+
from temporalio.contrib.aws.s3driver._client import S3StorageDriverClient
8+
from temporalio.contrib.aws.s3driver._driver import S3StorageDriver
9+
10+
__all__ = [
11+
"S3StorageDriverClient",
12+
"S3StorageDriver",
13+
]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""S3 storage driver client abstraction for the S3 storage driver.
2+
3+
.. warning::
4+
This API is experimental.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from abc import ABC, abstractmethod
10+
11+
12+
class S3StorageDriverClient(ABC):
13+
"""Abstract base class for S3 object operations.
14+
15+
Implementations must support ``put_object`` and ``get_object``. Multipart
16+
upload handling (if needed) is an internal concern of each implementation.
17+
18+
.. warning::
19+
This API is experimental.
20+
"""
21+
22+
@abstractmethod
23+
async def put_object(self, *, bucket: str, key: str, data: bytes) -> None:
24+
"""Upload *data* to the given S3 *bucket* and *key*."""
25+
26+
@abstractmethod
27+
async def object_exists(self, *, bucket: str, key: str) -> bool:
28+
"""Return ``True`` if an object exists at the given *bucket* and *key*."""
29+
30+
@abstractmethod
31+
async def get_object(self, *, bucket: str, key: str) -> bytes:
32+
"""Download and return the bytes stored at the given S3 *bucket* and *key*."""

0 commit comments

Comments
 (0)