Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
813 changes: 475 additions & 338 deletions docs/sdk-reference/operations/map.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions examples/java/operations/map/completion-config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.model.MapResult;

public class MapCompletionConfig extends DurableHandler<List<String>, List<String>> {
@Override
public List<String> handleRequest(List<String> items, DurableContext context) {
var config = MapConfig.builder()
.completionConfig(CompletionConfig.minSuccessful(3))
.build();

MapResult<String> result = context.map(
"process-items",
items,
String.class,
(item, index, ctx) -> ctx.step(
"process-" + index, String.class, s -> item.toUpperCase()),
config);

return result.succeeded();
}
}
28 changes: 28 additions & 0 deletions examples/java/operations/map/error-handling.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.MapResult;

public class MapErrorHandling extends DurableHandler<List<String>, Void> {
@Override
public Void handleRequest(List<String> items, DurableContext context) {
MapResult<String> result = context.map(
"process-items",
items,
String.class,
(item, index, ctx) -> ctx.step("process-" + index, String.class, s -> {
if ("bad".equals(item)) throw new IllegalArgumentException("bad item");
return item.toUpperCase();
}));

var failures = result.failed();
if (!failures.isEmpty()) {
System.out.println(failures.size() + " items failed");
failures.forEach(e -> System.out.println(e.errorType() + ": " + e.errorMessage()));
}

var successes = result.succeeded();
System.out.println(successes.size() + " items succeeded: " + successes);
return null;
}
}
34 changes: 34 additions & 0 deletions examples/java/operations/map/map-config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.model.MapResult;

public class MapConfigExample extends DurableHandler<List<String>, List<String>> {
private static final HttpClient HTTP = HttpClient.newHttpClient();

@Override
public List<String> handleRequest(List<String> urls, DurableContext context) {
var config = MapConfig.builder()
.maxConcurrency(5)
.completionConfig(CompletionConfig.toleratedFailureCount(2))
.build();

MapResult<String> result = context.map(
"fetch-urls",
urls,
String.class,
(url, index, ctx) -> ctx.step("fetch-" + index, String.class, s -> {
var request = HttpRequest.newBuilder(URI.create(url)).build();
return HTTP.send(request, HttpResponse.BodyHandlers.ofString()).body();
}),
config);

return result.succeeded();
}
}
14 changes: 14 additions & 0 deletions examples/java/operations/map/map-function.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import software.amazon.lambda.durable.DurableContext;

record Order(String id, double amount) {}
record Receipt(String orderId, double charged) {}

// MapFunction<Order, Receipt> implementation
Receipt processOrder(Order order, int index, DurableContext ctx) {
var validated = ctx.step("validate", Order.class, s -> {
if (order.amount() <= 0) throw new IllegalArgumentException("Invalid amount");
return order;
});
var charged = ctx.step("charge", Double.class, s -> validated.amount());
return new Receipt(validated.id(), charged);
}
21 changes: 21 additions & 0 deletions examples/java/operations/map/map-signature.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// sync — blocks until all items complete
<I, O> MapResult<O> map(String name, Collection<I> items, Class<O> resultType,
MapFunction<I, O> function)
<I, O> MapResult<O> map(String name, Collection<I> items, Class<O> resultType,
MapFunction<I, O> function, MapConfig config)
<I, O> MapResult<O> map(String name, Collection<I> items, TypeToken<O> resultType,
MapFunction<I, O> function)
<I, O> MapResult<O> map(String name, Collection<I> items, TypeToken<O> resultType,
MapFunction<I, O> function, MapConfig config)

// async — returns immediately
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
Class<O> resultType, MapFunction<I, O> function)
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
Class<O> resultType, MapFunction<I, O> function,
MapConfig config)
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
TypeToken<O> resultType, MapFunction<I, O> function)
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
TypeToken<O> resultType, MapFunction<I, O> function,
MapConfig config)
19 changes: 19 additions & 0 deletions examples/java/operations/map/named-map.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.MapResult;

public class NamedMap extends DurableHandler<List<String>, List<String>> {
@Override
public List<String> handleRequest(List<String> userIds, DurableContext context) {
// The name is always required in Java
MapResult<String> result = context.map(
"process-users",
userIds,
String.class,
(userId, index, ctx) -> ctx.step(
"process-" + index, String.class, s -> "processed-" + userId));

return result.succeeded();
}
}
28 changes: 28 additions & 0 deletions examples/java/operations/map/nested-map.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.model.MapResult;

public class NestedMap extends DurableHandler<List<Region>, List<List<String>>> {
record Region(String name, List<String> items) {}

@Override
public List<List<String>> handleRequest(List<Region> regions, DurableContext context) {
MapResult<List<String>> result = context.map(
"process-regions",
regions,
new TypeToken<List<String>>() {},
(region, index, ctx) -> {
MapResult<String> inner = ctx.map(
"process-" + region.name(),
region.items(),
String.class,
(item, i, innerCtx) -> innerCtx.step(
"item-" + i, String.class, s -> item.toUpperCase()));
return inner.succeeded();
});

return result.succeeded();
}
}
18 changes: 18 additions & 0 deletions examples/java/operations/map/simple-map.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.MapResult;

public class SimpleMap extends DurableHandler<Void, List<Integer>> {
@Override
public List<Integer> handleRequest(Void input, DurableContext context) {
MapResult<Integer> result = context.map(
"square-numbers",
List.of(1, 2, 3, 4, 5),
Integer.class,
(item, index, ctx) -> ctx.step(
"square-" + index, Integer.class, s -> item * item));

return result.results();
}
}
26 changes: 26 additions & 0 deletions examples/python/operations/map/completion-config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, MapConfig


def process_item(
ctx: DurableContext, item: str, index: int, items: list[str]
) -> str:
return ctx.step(lambda _: item.upper(), name=f"process-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
config = MapConfig(
completion_config=CompletionConfig(min_successful=3),
)
result: BatchResult[str] = context.map(
event["items"],
process_item,
name="process-items",
config=config,
)
return result.to_dict()
32 changes: 32 additions & 0 deletions examples/python/operations/map/error-handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)


def process_item(
ctx: DurableContext, item: str, index: int, items: list[str]
) -> str:
def do_process(_):
if item == "bad":
raise ValueError("bad item")
return item.upper()

return ctx.step(do_process, name=f"process-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> None:
result: BatchResult[str] = context.map(
event["items"],
process_item,
name="process-items",
)

if result.has_failure:
errors = result.get_errors()
print(f"{result.failure_count} items failed:", errors)

successes = result.get_results()
print(f"{result.success_count} items succeeded:", successes)
33 changes: 33 additions & 0 deletions examples/python/operations/map/map-config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import urllib.request

from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, MapConfig


def fetch_url(
ctx: DurableContext, url: str, index: int, urls: list[str]
) -> str:
def do_fetch(_):
with urllib.request.urlopen(url) as response:
return response.read().decode()

return ctx.step(do_fetch, name=f"fetch-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
config = MapConfig(
max_concurrency=5,
completion_config=CompletionConfig(tolerated_failure_count=2),
)
result: BatchResult[str] = context.map(
event["urls"],
fetch_url,
name="fetch-urls",
config=config,
)
return result.to_dict()
17 changes: 17 additions & 0 deletions examples/python/operations/map/map-function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from aws_durable_execution_sdk_python import DurableContext


def process_order(
ctx: DurableContext,
order: dict,
index: int,
orders: list[dict],
) -> dict:
def validate(_):
if order["amount"] <= 0:
raise ValueError("Invalid amount")
return order

validated = ctx.step(validate, name="validate")
charged = ctx.step(lambda _: validated["amount"], name="charge")
return {"orderId": validated["id"], "charged": charged}
6 changes: 6 additions & 0 deletions examples/python/operations/map/map-signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def map(
inputs: Sequence[U],
func: Callable[[DurableContext, U, int, Sequence[U]], T],
name: str | None = None,
config: MapConfig | None = None,
) -> BatchResult[T]: ...
22 changes: 22 additions & 0 deletions examples/python/operations/map/named-map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)


def process_user(
ctx: DurableContext, user_id: str, index: int, user_ids: list[str]
) -> str:
return ctx.step(lambda _: f"processed-{user_id}", name=f"process-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
# Pass name as keyword argument; omit or pass None to leave unnamed
result: BatchResult[str] = context.map(
event["userIds"],
process_user,
name="process-users",
)
return result.to_dict()
32 changes: 32 additions & 0 deletions examples/python/operations/map/nested-map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)


def process_item(
ctx: DurableContext, item: str, index: int, items: list[str]
) -> str:
return ctx.step(lambda _: item.upper(), name=f"item-{index}")


def process_region(
ctx: DurableContext, region: dict, index: int, regions: list[dict]
) -> list[str]:
inner: BatchResult[str] = ctx.map(
region["items"],
process_item,
name=f"process-{region['name']}",
)
return inner.get_results()


@durable_execution
def handler(event: dict, context: DurableContext) -> list[list[str]]:
result: BatchResult[list[str]] = context.map(
event["regions"],
process_region,
name="process-regions",
)
return result.to_dict()
19 changes: 19 additions & 0 deletions examples/python/operations/map/simple-map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)


def square(ctx: DurableContext, item: int, index: int, items: list[int]) -> int:
return ctx.step(lambda _: item * item, name=f"square-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[int]:
result: BatchResult[int] = context.map(
[1, 2, 3, 4, 5],
square,
name="square-numbers",
)
return result.to_dict()
Loading
Loading