From 0af1921a9597e5e39bba100656854e6bf303c3b9 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sat, 16 May 2026 09:25:56 -0700 Subject: [PATCH 1/2] Add support activities to Nexus operations --- .../temporal/client/ActivityClientImpl.java | 8 +- .../ActivityClientCallsInterceptor.java | 66 ++++ .../client/ActivityClientInternal.java | 16 + .../client/NexusStartActivityRequest.java | 82 +++++ .../client/NexusStartActivityResponse.java | 35 +++ .../client/RootActivityClientInvoker.java | 63 +++- .../internal/common/InternalUtils.java | 74 +++-- .../nexus/NexusStartActivityHelper.java | 67 ++++ .../internal/nexus/OperationToken.java | 23 ++ .../internal/nexus/OperationTokenType.java | 4 +- .../internal/nexus/OperationTokenUtil.java | 50 ++- .../nexus/CancelActivityExecutionInput.java | 23 ++ .../temporal/nexus/TemporalNexusClient.java | 290 ++++++++++++++++++ .../nexus/TemporalNexusClientImpl.java | 253 ++++++++++++++- .../nexus/TemporalOperationHandler.java | 55 +++- .../internal/nexus/WorkflowRunTokenTest.java | 62 ++++ .../nexus/TemporalNexusClientImplTest.java | 247 +++++++++++++++ .../nexus/AsyncActivityOperationTest.java | 217 +++++++++++++ .../CancelActivityAsyncOperationTest.java | 264 ++++++++++++++++ 19 files changed, 1847 insertions(+), 52 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/client/ActivityClientInternal.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityRequest.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityResponse.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartActivityHelper.java create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java create mode 100644 temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncActivityOperationTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelActivityAsyncOperationTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityClientImpl.java index efcc9df698..77bf2bd794 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityClientImpl.java @@ -6,6 +6,7 @@ import io.temporal.common.interceptors.ActivityClientCallsInterceptor; import io.temporal.common.interceptors.ActivityClientInterceptor; import io.temporal.common.interceptors.Header; +import io.temporal.internal.client.ActivityClientInternal; import io.temporal.internal.client.ActivityHandleImpl; import io.temporal.internal.client.RootActivityClientInvoker; import io.temporal.internal.client.external.GenericWorkflowClientImpl; @@ -27,7 +28,7 @@ * Implementation of {@link ActivityClient} that delegates calls through the activity interceptor * chain and ultimately to the Temporal service. */ -class ActivityClientImpl implements ActivityClient { +class ActivityClientImpl implements ActivityClient, ActivityClientInternal { private final WorkflowServiceStubs stubs; private final ActivityClientOptions options; @@ -56,6 +57,11 @@ private static ActivityClientCallsInterceptor initializeClientInvoker( return invoker; } + @Override + public ActivityClientCallsInterceptor getInvoker() { + return invoker; + } + // ---- Interface-based start (Proc variants) ---- @Override diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java index 16a34dc285..5e278969d5 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java @@ -9,6 +9,8 @@ import io.temporal.common.Experimental; import java.lang.reflect.Type; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -116,19 +118,56 @@ GetActivityResultOutput getActivityResult(GetActivityResultInput input CompletableFuture> getActivityResultAsync( GetActivityResultInput input); + /** + * Nexus completion-callback metadata propagated through {@link StartActivityInput} when an + * activity is scheduled from a Nexus operation handler. + */ + @Experimental + final class CompletionCallback { + private final String url; + private final Map headers; + + public CompletionCallback(String url, Map headers) { + this.url = Objects.requireNonNull(url); + this.headers = Objects.requireNonNull(headers); + } + + public String getUrl() { + return url; + } + + public Map getHeaders() { + return headers; + } + } + @Experimental final class StartActivityInput { private final String activityType; private final List args; private final StartActivityOptions options; private final Header header; + private final @Nullable CompletionCallback completionCallback; + private final @Nullable List links; public StartActivityInput( String activityType, List args, StartActivityOptions options, Header header) { + this(activityType, args, options, header, null, null); + } + + public StartActivityInput( + String activityType, + List args, + StartActivityOptions options, + Header header, + @Nullable CompletionCallback completionCallback, + @Nullable List links) { this.activityType = activityType; this.args = args; this.options = options; this.header = header; + this.completionCallback = completionCallback; + this.links = links; } public String getActivityType() { @@ -146,6 +185,16 @@ public StartActivityOptions getOptions() { public Header getHeader() { return header; } + + @Nullable + public CompletionCallback getCompletionCallback() { + return completionCallback; + } + + @Nullable + public List getLinks() { + return links; + } } @Experimental @@ -153,9 +202,21 @@ final class StartActivityOutput { private final String activityId; private final @Nullable String activityRunId; + /** + * Set by the invoker when the start request included a Nexus completion callback; null + * otherwise. Internal use; do not depend on this in user-facing code. + */ + private final @Nullable String nexusOperationToken; + public StartActivityOutput(String activityId, @Nullable String activityRunId) { + this(activityId, activityRunId, null); + } + + public StartActivityOutput( + String activityId, @Nullable String activityRunId, @Nullable String nexusOperationToken) { this.activityId = activityId; this.activityRunId = activityRunId; + this.nexusOperationToken = nexusOperationToken; } public String getActivityId() { @@ -166,6 +227,11 @@ public String getActivityId() { public String getActivityRunId() { return activityRunId; } + + @Nullable + public String getNexusOperationToken() { + return nexusOperationToken; + } } @Experimental diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/ActivityClientInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/client/ActivityClientInternal.java new file mode 100644 index 0000000000..1652da00d2 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/ActivityClientInternal.java @@ -0,0 +1,16 @@ +package io.temporal.internal.client; + +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; + +/** + * Internal-only view of an {@code ActivityClient} that exposes the configured interceptor chain. + * + *

Lives in {@code io.temporal.internal.client} so that other internal SDK packages (e.g. {@code + * io.temporal.nexus}) can route a fully-constructed {@link + * ActivityClientCallsInterceptor.StartActivityInput} through the chain without bypassing + * user-registered interceptors or the metrics-tagged scope, and without forcing the concrete impl + * class to be public. + */ +public interface ActivityClientInternal { + ActivityClientCallsInterceptor getInvoker(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityRequest.java b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityRequest.java new file mode 100644 index 0000000000..4de1626408 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityRequest.java @@ -0,0 +1,82 @@ +package io.temporal.internal.client; + +import io.nexusrpc.Link; +import io.temporal.client.StartActivityOptions; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.Header; +import java.util.List; +import java.util.Map; + +/** + * Request used to start an activity from a Nexus operation handler. Mirrors {@link + * NexusStartWorkflowRequest} but carries the activity-specific scheduling payload. + */ +@Experimental +public final class NexusStartActivityRequest { + private final String requestId; + private final String callbackUrl; + private final Map callbackHeaders; + private final String taskQueue; + private final List links; + private final String activityType; + private final List args; + private final StartActivityOptions options; + private final Header header; + + public NexusStartActivityRequest( + String requestId, + String callbackUrl, + Map callbackHeaders, + String taskQueue, + List links, + String activityType, + List args, + StartActivityOptions options, + Header header) { + this.requestId = requestId; + this.callbackUrl = callbackUrl; + this.callbackHeaders = callbackHeaders; + this.taskQueue = taskQueue; + this.links = links; + this.activityType = activityType; + this.args = args; + this.options = options; + this.header = header; + } + + public String getRequestId() { + return requestId; + } + + public String getCallbackUrl() { + return callbackUrl; + } + + public Map getCallbackHeaders() { + return callbackHeaders; + } + + public String getTaskQueue() { + return taskQueue; + } + + public List getLinks() { + return links; + } + + public String getActivityType() { + return activityType; + } + + public List getArgs() { + return args; + } + + public StartActivityOptions getOptions() { + return options; + } + + public Header getHeader() { + return header; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityResponse.java b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityResponse.java new file mode 100644 index 0000000000..6971ee550e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartActivityResponse.java @@ -0,0 +1,35 @@ +package io.temporal.internal.client; + +import io.temporal.common.Experimental; +import javax.annotation.Nullable; + +/** + * Response returned from starting an activity via {@link NexusStartActivityRequest}. Mirrors {@link + * NexusStartWorkflowResponse}. + */ +@Experimental +public final class NexusStartActivityResponse { + private final String activityId; + private final @Nullable String runId; + private final String operationToken; + + public NexusStartActivityResponse( + String activityId, @Nullable String runId, String operationToken) { + this.activityId = activityId; + this.runId = runId; + this.operationToken = operationToken; + } + + public String getActivityId() { + return activityId; + } + + @Nullable + public String getRunId() { + return runId; + } + + public String getOperationToken() { + return operationToken; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java index 96a9e70f5a..f9678c1996 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java @@ -9,6 +9,8 @@ import io.grpc.StatusRuntimeException; import io.temporal.api.activity.v1.ActivityExecutionOutcome; import io.temporal.api.common.v1.ActivityType; +import io.temporal.api.common.v1.Callback; +import io.temporal.api.common.v1.Link; import io.temporal.api.common.v1.Payloads; import io.temporal.api.errordetails.v1.ActivityExecutionAlreadyStartedFailure; import io.temporal.api.sdk.v1.UserMetadata; @@ -19,6 +21,8 @@ import io.temporal.common.interceptors.ActivityClientCallsInterceptor; import io.temporal.internal.client.external.GenericWorkflowClient; import io.temporal.internal.common.HeaderUtils; +import io.temporal.internal.common.InternalUtils; +import io.temporal.internal.common.LinkConverter; import io.temporal.internal.common.ProtoConverters; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SearchAttributesUtil; @@ -28,7 +32,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Terminus of the activity interceptor chain. Implements all activity RPCs against the Temporal @@ -36,6 +43,8 @@ */ public class RootActivityClientInvoker implements ActivityClientCallsInterceptor { + private static final Logger log = LoggerFactory.getLogger(RootActivityClientInvoker.class); + private final GenericWorkflowClient genericClient; private final ActivityClientOptions clientOptions; @@ -104,6 +113,58 @@ public StartActivityOutput startActivity(StartActivityInput input) { io.temporal.api.common.v1.Header grpcHeader = HeaderUtils.toHeaderGrpc(input.getHeader(), null); request.setHeader(grpcHeader); + // Hoisted so it can be returned in StartActivityOutput when a Nexus callback is present. + String nexusOperationToken = null; + if (input.getCompletionCallback() != null) { + List protoLinks = null; + if (input.getLinks() != null) { + protoLinks = + input.getLinks().stream() + .map( + link -> { + if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor() + .getFullName() + .equals(link.getType())) { + io.temporal.api.nexus.v1.Link nexusLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setType(link.getType()) + .setUrl(link.getUri().toString()) + .build(); + return LinkConverter.nexusLinkToWorkflowEvent(nexusLink); + } else { + log.warn("ignoring unsupported link data type: {}", link.getType()); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (!protoLinks.isEmpty()) { + request.addAllLinks(protoLinks); + } else { + protoLinks = null; + } + } + // Generate the operation token from the user-supplied activity ID and namespace so the + // dual OPERATION_ID + OPERATION_TOKEN headers can be injected before the start RPC fires. + try { + nexusOperationToken = + io.temporal.internal.nexus.OperationTokenUtil.generateActivityExecutionOperationToken( + options.getId(), clientOptions.getNamespace()); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + throw new io.nexusrpc.handler.HandlerException( + io.nexusrpc.handler.HandlerException.ErrorType.BAD_REQUEST, + "failed to generate activity operation token", + e); + } + Callback cb = + InternalUtils.buildNexusCallback( + input.getCompletionCallback().getUrl(), + input.getCompletionCallback().getHeaders(), + nexusOperationToken, + protoLinks); + request.addCompletionCallbacks(cb); + } + StartActivityExecutionResponse response; try { response = genericClient.startActivity(request.build()); @@ -121,7 +182,7 @@ public StartActivityOutput startActivity(StartActivityInput input) { } String runId = response.getRunId().isEmpty() ? null : response.getRunId(); - return new StartActivityOutput(options.getId(), runId); + return new StartActivityOutput(options.getId(), runId, nexusOperationToken); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index 4c5ec49b12..ee97e3bd5c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -115,35 +115,10 @@ public static NexusWorkflowStarter createNexusBoundStub( // If a callback URL is provided, pass it as a completion callback. if (!Strings.isNullOrEmpty(request.getCallbackUrl())) { - // Add the Nexus operation ID to the headers if it is not already present to support - // fabricating - // a NexusOperationStarted event if the completion is received before the response to a - // StartOperation request. - Map headers = - request.getCallbackHeaders().entrySet().stream() - .collect( - Collectors.toMap( - (k) -> k.getKey().toLowerCase(), - Map.Entry::getValue, - (a, b) -> a, - () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); - if (!headers.containsKey(Header.OPERATION_ID)) { - headers.put(Header.OPERATION_ID.toLowerCase(), operationToken); - } - if (!headers.containsKey(Header.OPERATION_TOKEN)) { - headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken); - } - Callback.Builder cbBuilder = - Callback.newBuilder() - .setNexus( - Callback.Nexus.newBuilder() - .setUrl(request.getCallbackUrl()) - .putAllHeader(headers) - .build()); - if (links != null) { - cbBuilder.addAllLinks(links); - } - nexusWorkflowOptions.setCompletionCallbacks(Collections.singletonList(cbBuilder.build())); + Callback cb = + buildNexusCallback( + request.getCallbackUrl(), request.getCallbackHeaders(), operationToken, links); + nexusWorkflowOptions.setCompletionCallbacks(Collections.singletonList(cb)); } if (options.getTaskQueue() == null) { @@ -159,6 +134,47 @@ public static NexusWorkflowStarter createNexusBoundStub( return new NexusWorkflowStarter(stub.newInstance(nexusWorkflowOptions.build()), operationToken); } + /** + * Builds a {@link Callback} for use as a Nexus completion callback. Injects both the legacy + * {@code Nexus-Operation-Id} and the newer {@code Nexus-Operation-Token} headers + * (case-insensitive lookup) when not already present so the server can fabricate + * operation-started events if the completion is received before the response to a StartOperation + * request. + * + *

Shared by the workflow start path ({@link #createNexusBoundStub}) and the activity start + * path ({@code RootActivityClientInvoker.startActivity}). The dual {@code OPERATION_ID} + {@code + * OPERATION_TOKEN} headers must be injected before the start RPC is issued. + */ + @SuppressWarnings("deprecation") // Check the OPERATION_ID header for backwards compatibility + public static Callback buildNexusCallback( + String callbackUrl, + Map callbackHeaders, + String operationToken, + List protoLinks) { + Map headers = + callbackHeaders.entrySet().stream() + .collect( + Collectors.toMap( + (k) -> k.getKey().toLowerCase(), + Map.Entry::getValue, + (a, b) -> a, + () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); + if (!headers.containsKey(Header.OPERATION_ID)) { + headers.put(Header.OPERATION_ID.toLowerCase(), operationToken); + } + if (!headers.containsKey(Header.OPERATION_TOKEN)) { + headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken); + } + Callback.Builder cbBuilder = + Callback.newBuilder() + .setNexus( + Callback.Nexus.newBuilder().setUrl(callbackUrl).putAllHeader(headers).build()); + if (protoLinks != null) { + cbBuilder.addAllLinks(protoLinks); + } + return cbBuilder.build(); + } + /** Check the method name for reserved prefixes or names. */ public static void checkMethodName(POJOWorkflowMethodMetadata methodMetadata) { if (methodMetadata.getName().startsWith(TEMPORAL_RESERVED_PREFIX)) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartActivityHelper.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartActivityHelper.java new file mode 100644 index 0000000000..3e6fd2cea6 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartActivityHelper.java @@ -0,0 +1,67 @@ +package io.temporal.internal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.StartActivityOptions; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.Header; +import io.temporal.internal.client.NexusStartActivityRequest; +import io.temporal.internal.client.NexusStartActivityResponse; +import java.util.List; +import java.util.function.Function; + +/** + * Shared helper for starting an activity from a Nexus operation and (in future) attaching links to + * the operation context. Mirrors {@link NexusStartWorkflowHelper} for activities. + */ +@Experimental +public class NexusStartActivityHelper { + + /** + * Starts an activity via the provided invoker function and returns the response. + * + *

The link-attachment block is intentionally a no-op in this revision; see the TODO inside. + * + * @param ctx the operation context (link attachment is deferred — see TODO) + * @param details the operation start details containing requestId, callback, links + * @param activityType the activity type name + * @param args the activity arguments + * @param options the activity scheduling options (must include task queue, ID) + * @param header the propagated header + * @param invoker function that starts the activity given a {@link NexusStartActivityRequest} + * @return the {@link NexusStartActivityResponse} containing the activity ID and operation token + */ + public static NexusStartActivityResponse startActivityAndAttachLinks( + OperationContext ctx, + OperationStartDetails details, + String activityType, + List args, + StartActivityOptions options, + Header header, + Function invoker) { + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); + + NexusStartActivityRequest nexusRequest = + new NexusStartActivityRequest( + details.getRequestId(), + details.getCallbackUrl(), + details.getCallbackHeaders(), + nexusCtx.getTaskQueue(), + details.getLinks(), + activityType, + args, + options, + header); + + NexusStartActivityResponse response = invoker.apply(nexusRequest); + + // TODO: Attach activity-event link when server-side activity link support is available. + // No StartActivityResponseLink analog exists and there is no verified activity-event link path + // in LinkConverter. Do NOT copy the synthetic workflow-event link fabrication from + // NexusStartWorkflowHelper; Nexus operations function correctly without diagnostic links. + + return response; + } + + private NexusStartActivityHelper() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java index 4bd5635e93..3593138a3e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java @@ -1,5 +1,6 @@ package io.temporal.internal.nexus; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -16,16 +17,24 @@ public class OperationToken { private final String namespace; @JsonProperty("wid") + @JsonInclude(JsonInclude.Include.NON_NULL) private final String workflowId; + @JsonProperty("aid") + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String activityId; + + @JsonCreator public OperationToken( @JsonProperty("t") Integer type, @JsonProperty("ns") String namespace, @JsonProperty("wid") String workflowId, + @JsonProperty("aid") String activityId, @JsonProperty("v") Integer version) { this.type = OperationTokenType.fromValue(type); this.namespace = namespace; this.workflowId = workflowId; + this.activityId = activityId; this.version = version; } @@ -33,6 +42,16 @@ public OperationToken(OperationTokenType type, String namespace, String workflow this.type = type; this.namespace = namespace; this.workflowId = workflowId; + this.activityId = null; + this.version = null; + } + + public OperationToken( + OperationTokenType type, String namespace, String workflowId, String activityId) { + this.type = type; + this.namespace = namespace; + this.workflowId = workflowId; + this.activityId = activityId; this.version = null; } @@ -51,4 +70,8 @@ public String getNamespace() { public String getWorkflowId() { return workflowId; } + + public String getActivityId() { + return activityId; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenType.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenType.java index 11aa57a81e..952d784bef 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenType.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenType.java @@ -5,7 +5,9 @@ public enum OperationTokenType { UNKNOWN(0), - WORKFLOW_RUN(1); + WORKFLOW_RUN(1), + // Values 2 and 3 are reserved for future token types (e.g. update-workflow, get-workflow-result). + ACTIVITY_EXECUTION(4); private final int value; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java index 737a84aad4..b2654a4f62 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java @@ -31,8 +31,20 @@ public static OperationToken loadOperationToken(String operationToken) { if (token.getVersion() != null && token.getVersion() != 0) { throw new IllegalArgumentException("Invalid operation token: unexpected version field"); } - if (Strings.isNullOrEmpty(token.getWorkflowId())) { - throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)"); + switch (token.getType()) { + case WORKFLOW_RUN: + if (Strings.isNullOrEmpty(token.getWorkflowId())) { + throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)"); + } + break; + case ACTIVITY_EXECUTION: + if (Strings.isNullOrEmpty(token.getActivityId())) { + throw new IllegalArgumentException("Invalid operation token: missing activity ID (aid)"); + } + break; + default: + throw new IllegalArgumentException( + "Invalid operation token: unknown operation token type: " + token.getType()); } return token; } @@ -61,6 +73,31 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) { return loadWorkflowRunOperationToken(operationToken).getWorkflowId(); } + /** + * Load an activity execution operation token, asserting that the token type is {@link + * OperationTokenType#ACTIVITY_EXECUTION}. + * + * @throws IllegalArgumentException if the operation token is invalid or not an activity execution + * token + */ + public static OperationToken loadActivityExecutionOperationToken(String operationToken) { + OperationToken token = loadOperationToken(operationToken); + if (!token.getType().equals(OperationTokenType.ACTIVITY_EXECUTION)) { + throw new IllegalArgumentException( + "Invalid activity execution token: incorrect operation token type: " + token.getType()); + } + return token; + } + + /** + * Extract the activity ID from an activity execution operation token. + * + * @throws IllegalArgumentException if the operation token is invalid + */ + public static String loadActivityIdFromOperationToken(String operationToken) { + return loadActivityExecutionOperationToken(operationToken).getActivityId(); + } + /** Generate a workflow run operation token from a workflow ID and namespace. */ public static String generateWorkflowRunOperationToken(String workflowId, String namespace) throws JsonProcessingException { @@ -70,5 +107,14 @@ public static String generateWorkflowRunOperationToken(String workflowId, String return encoder.encodeToString(json.getBytes()); } + /** Generate an activity execution operation token from an activity ID and namespace. */ + public static String generateActivityExecutionOperationToken(String activityId, String namespace) + throws JsonProcessingException { + String json = + ow.writeValueAsString( + new OperationToken(OperationTokenType.ACTIVITY_EXECUTION, namespace, null, activityId)); + return encoder.encodeToString(json.getBytes()); + } + private OperationTokenUtil() {} } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java b/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java new file mode 100644 index 0000000000..511f9fc012 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java @@ -0,0 +1,23 @@ +package io.temporal.nexus; + +import io.temporal.common.Experimental; +import java.util.Objects; + +/** + * Input to {@link TemporalOperationHandler#cancelActivityExecution} describing the activity + * execution to cancel. + */ +@Experimental +public final class CancelActivityExecutionInput { + + private final String activityId; + + public CancelActivityExecutionInput(String activityId) { + this.activityId = Objects.requireNonNull(activityId); + } + + /** Returns the activity ID extracted from the operation token. */ + public String getActivityId() { + return activityId; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java index 4eed1fe350..9fe00a209e 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java @@ -1,5 +1,6 @@ package io.temporal.nexus; +import io.temporal.client.StartActivityOptions; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.common.Experimental; @@ -507,4 +508,293 @@ TemporalOperationResult startWorkflow( Type resultType, WorkflowOptions options, Object... args); + + // ---------- Activity overloads ---------- + + /** + * Starts a zero-argument activity that returns a value. + * + *

Example: + * + *

{@code
+   * client.startActivity(MyActivity.class, MyActivity::run, options)
+   * }
+ * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func1 activityMethod, + StartActivityOptions options); + + /** + * Starts a one-argument activity that returns a value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func2 activityMethod, + A1 arg1, + StartActivityOptions options); + + /** + * Starts a two-argument activity that returns a value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func3 activityMethod, + A1 arg1, + A2 arg2, + StartActivityOptions options); + + /** + * Starts a three-argument activity that returns a value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param arg3 third activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the type of the third activity argument + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func4 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + StartActivityOptions options); + + /** + * Starts a four-argument activity that returns a value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param arg3 third activity argument + * @param arg4 fourth activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the type of the third activity argument + * @param the type of the fourth activity argument + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func5 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + StartActivityOptions options); + + /** + * Starts a five-argument activity that returns a value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param arg3 third activity argument + * @param arg4 fourth activity argument + * @param arg5 fifth activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the type of the third activity argument + * @param the type of the fourth activity argument + * @param the type of the fifth activity argument + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func6 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + StartActivityOptions options); + + /** + * Starts a zero-argument activity with no return value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, Functions.Proc1 activityMethod, StartActivityOptions options); + + /** + * Starts a one-argument activity with no return value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc2 activityMethod, + A1 arg1, + StartActivityOptions options); + + /** + * Starts a two-argument activity with no return value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc3 activityMethod, + A1 arg1, + A2 arg2, + StartActivityOptions options); + + /** + * Starts a three-argument activity with no return value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param arg3 third activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the type of the third activity argument + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc4 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + StartActivityOptions options); + + /** + * Starts a four-argument activity with no return value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param arg3 third activity argument + * @param arg4 fourth activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the type of the third activity argument + * @param the type of the fourth activity argument + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc5 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + StartActivityOptions options); + + /** + * Starts a five-argument activity with no return value. + * + * @param activityInterface the activity interface class + * @param activityMethod unbound method reference to the activity method + * @param arg1 first activity argument + * @param arg2 second activity argument + * @param arg3 third activity argument + * @param arg4 fourth activity argument + * @param arg5 fifth activity argument + * @param options activity start options (must include taskQueue) + * @param the activity interface type + * @param the type of the first activity argument + * @param the type of the second activity argument + * @param the type of the third activity argument + * @param the type of the fourth activity argument + * @param the type of the fifth activity argument + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc6 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + StartActivityOptions options); + + /** + * Starts an activity using an untyped activity type name. + * + *

Example: + * + *

{@code
+   * client.startActivity("MyActivity", String.class, options, input)
+   * }
+ * + * @param activityType the activity type name string + * @param resultClass the expected result class + * @param options activity start options (must include taskQueue) + * @param args activity arguments + * @param the activity return type + * @return an async {@link TemporalOperationResult} with the activity-execution operation token + */ + TemporalOperationResult startActivity( + String activityType, Class resultClass, StartActivityOptions options, Object... args); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java index 43e29e18f5..8e84883381 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java @@ -3,14 +3,27 @@ import io.nexusrpc.handler.HandlerException; import io.nexusrpc.handler.OperationContext; import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.ActivityClient; +import io.temporal.client.ActivityClientOptions; +import io.temporal.client.StartActivityOptions; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; import io.temporal.common.Experimental; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.Header; +import io.temporal.internal.client.ActivityClientInternal; +import io.temporal.internal.client.NexusStartActivityResponse; import io.temporal.internal.client.NexusStartWorkflowResponse; +import io.temporal.internal.nexus.NexusStartActivityHelper; import io.temporal.internal.nexus.NexusStartWorkflowHelper; +import io.temporal.internal.util.MethodExtractor; import io.temporal.workflow.Functions; +import java.lang.reflect.Method; import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -244,13 +257,7 @@ public TemporalOperationResult startWorkflow( } private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { - if (!asyncOperationStarted.compareAndSet(false, true)) { - throw new HandlerException( - HandlerException.ErrorType.BAD_REQUEST, - new IllegalStateException( - "Only one async operation can be started per operation handler invocation. " - + "Use getWorkflowClient() for additional workflow interactions.")); - } + claimAsyncSlot(); try { NexusStartWorkflowResponse response = NexusStartWorkflowHelper.startWorkflowAndAttachLinks( @@ -265,4 +272,236 @@ private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) throw t; } } + + // ---------- Activity overloads (Func returning) ---------- + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func1 activityMethod, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Collections.emptyList(), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func2 activityMethod, + A1 arg1, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Collections.singletonList(arg1), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func3 activityMethod, + A1 arg1, + A2 arg2, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func4 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2, arg3), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func5 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2, arg3, arg4), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Func6 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2, arg3, arg4, arg5), options); + } + + // ---------- Activity overloads (Proc void) ---------- + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, Functions.Proc1 activityMethod, StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Collections.emptyList(), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc2 activityMethod, + A1 arg1, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Collections.singletonList(arg1), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc3 activityMethod, + A1 arg1, + A2 arg2, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc4 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2, arg3), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc5 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2, arg3, arg4), options); + } + + @Override + public TemporalOperationResult startActivity( + Class activityInterface, + Functions.Proc6 activityMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + StartActivityOptions options) { + Method method = MethodExtractor.extract(activityInterface, activityMethod); + String activityType = MethodExtractor.activityTypeName(activityInterface, method); + return startActivityImpl(activityType, Arrays.asList(arg1, arg2, arg3, arg4, arg5), options); + } + + // ---------- Activity untyped ---------- + + @Override + public TemporalOperationResult startActivity( + String activityType, Class resultClass, StartActivityOptions options, Object... args) { + List argList = args == null ? Collections.emptyList() : Arrays.asList(args); + return startActivityImpl(activityType, argList, options); + } + + private TemporalOperationResult startActivityImpl( + String activityType, List args, StartActivityOptions options) { + claimAsyncSlot(); + try { + NexusStartActivityResponse response = + NexusStartActivityHelper.startActivityAndAttachLinks( + operationContext, + operationStartDetails, + activityType, + args, + options, + Header.empty(), + request -> { + ActivityClientCallsInterceptor.CompletionCallback cb = + (request.getCallbackUrl() == null || request.getCallbackUrl().isEmpty()) + ? null + : new ActivityClientCallsInterceptor.CompletionCallback( + request.getCallbackUrl(), request.getCallbackHeaders()); + ActivityClientCallsInterceptor.StartActivityInput input = + new ActivityClientCallsInterceptor.StartActivityInput( + request.getActivityType(), + request.getArgs(), + request.getOptions(), + request.getHeader(), + cb, + request.getLinks()); + // Build an ActivityClient that mirrors the surrounding WorkflowClient's options + // so that the metrics-tagged scope built by the impl is preserved. setIdentity is + // load-bearing because the cancel RPC reads it from clientOptions.getIdentity(). + ActivityClient activityClient = + ActivityClient.newInstance( + client.getWorkflowServiceStubs(), + ActivityClientOptions.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setDataConverter(client.getOptions().getDataConverter()) + .setIdentity(client.getOptions().getIdentity()) + .build()); + ActivityClientCallsInterceptor.StartActivityOutput out = + ((ActivityClientInternal) activityClient).getInvoker().startActivity(input); + // The invoker generated and injected the token into the callback headers before + // the start RPC; read it back from the output instead of regenerating it. + String token = out.getNexusOperationToken(); + if (token == null) { + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + "invoker did not return a Nexus operation token for activity start with callback", + new IllegalStateException( + "nexusOperationToken is null on StartActivityOutput when CompletionCallback was set")); + } + return new NexusStartActivityResponse( + out.getActivityId(), out.getActivityRunId(), token); + }); + return TemporalOperationResult.async(response.getOperationToken()); + } catch (Throwable t) { + // Reset on failure so that if the activity start throws, the handler can retry without + // being blocked by the guard. + asyncOperationStarted.set(false); + throw t; + } + } + + private void claimAsyncSlot() { + if (!asyncOperationStarted.compareAndSet(false, true)) { + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new IllegalStateException( + "Only one async operation can be started per operation handler invocation. " + + "Use getWorkflowClient() for additional workflow interactions.")); + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java index 6a01d11fc6..1c7e57f402 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -1,17 +1,19 @@ package io.temporal.nexus; import io.nexusrpc.handler.*; +import io.temporal.client.ActivityClient; +import io.temporal.client.ActivityClientOptions; import io.temporal.client.WorkflowClient; import io.temporal.common.Experimental; import io.temporal.internal.nexus.CurrentNexusOperationContext; import io.temporal.internal.nexus.InternalNexusOperationContext; import io.temporal.internal.nexus.OperationToken; -import io.temporal.internal.nexus.OperationTokenType; import io.temporal.internal.nexus.OperationTokenUtil; /** * Generic Nexus operation handler backed by Temporal. Implements {@link OperationHandler} and - * provides a composable way to map Temporal operations (start workflow, etc.) to Nexus operations. + * provides a composable way to map Temporal operations (start workflow, start activity, etc.) to + * Nexus operations. * *

Usage example: * @@ -30,8 +32,10 @@ * } * *

This class supports subclassing to customize cancel behavior. Override {@link - * #cancelWorkflowRun} to change how workflow-run cancellations are handled. The {@link #start} and - * {@link #cancel} methods should not be overridden — they contain the core dispatch logic. + * #cancelWorkflowRun} to change how workflow-run (token type {@code t:1}) cancellations are + * handled, or {@link #cancelActivityExecution} to change how activity-execution (token type {@code + * t:4}) cancellations are handled. The {@link #start} and {@link #cancel} methods should not be + * overridden — they contain the core dispatch logic. * * @param the input type * @param the result type @@ -59,7 +63,7 @@ protected TemporalOperationHandler(StartHandler startHandler) { /** * Creates a {@link TemporalOperationHandler} from a start handler. Subclass and override {@link - * #cancelWorkflowRun} to customize cancel behavior. + * #cancelWorkflowRun} or {@link #cancelActivityExecution} to customize cancel behavior. * * @param startHandler the handler to invoke on start operation requests * @return an operation handler backed by the given start handler @@ -100,12 +104,18 @@ public final void cancel(OperationContext ctx, OperationCancelDetails details) { } TemporalOperationCancelContext cancelContext = new TemporalOperationCancelContext(ctx, details); - if (token.getType() == OperationTokenType.WORKFLOW_RUN) { - cancelWorkflowRun(cancelContext, new CancelWorkflowRunInput(token.getWorkflowId())); - } else { - throw new HandlerException( - HandlerException.ErrorType.BAD_REQUEST, - new IllegalArgumentException("unsupported operation token type: " + token.getType())); + switch (token.getType()) { + case WORKFLOW_RUN: + cancelWorkflowRun(cancelContext, new CancelWorkflowRunInput(token.getWorkflowId())); + break; + case ACTIVITY_EXECUTION: + cancelActivityExecution( + cancelContext, new CancelActivityExecutionInput(token.getActivityId())); + break; + default: + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new IllegalArgumentException("unsupported operation token type: " + token.getType())); } } @@ -123,4 +133,27 @@ protected void cancelWorkflowRun( WorkflowClient client = CurrentNexusOperationContext.get().getWorkflowClient(); client.newUntypedWorkflowStub(input.getWorkflowId()).cancel(); } + + /** + * Called when a cancel request is received for an activity-execution token (type=4). Override to + * customize cancel behavior. + * + *

Default behavior: requests cancellation of the underlying standalone activity execution. + * + * @param context the cancel context + * @param input describes the activity execution to cancel + */ + protected void cancelActivityExecution( + TemporalOperationCancelContext context, CancelActivityExecutionInput input) { + WorkflowClient wc = CurrentNexusOperationContext.get().getWorkflowClient(); + ActivityClient ac = + ActivityClient.newInstance( + wc.getWorkflowServiceStubs(), + ActivityClientOptions.newBuilder() + .setNamespace(wc.getOptions().getNamespace()) + .setDataConverter(wc.getOptions().getDataConverter()) + .setIdentity(wc.getOptions().getIdentity()) + .build()); + ac.getHandle(input.getActivityId(), null).cancel(); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java index 1f22fe8c2e..84caf9e483 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java @@ -92,6 +92,68 @@ public void loadWorkflowIdFromGoOperationToken() { Assert.assertEquals(OperationTokenType.WORKFLOW_RUN, token.getType()); } + @Test + public void roundTripActivityExecutionToken() throws JsonProcessingException { + String encoded = OperationTokenUtil.generateActivityExecutionOperationToken("act-1", "ns"); + OperationToken token = OperationTokenUtil.loadOperationToken(encoded); + Assert.assertEquals(OperationTokenType.ACTIVITY_EXECUTION, token.getType()); + Assert.assertEquals("act-1", token.getActivityId()); + Assert.assertEquals("ns", token.getNamespace()); + Assert.assertNull(token.getWorkflowId()); + Assert.assertNull(token.getVersion()); + + // Also exercise the symmetric activityId loader. + Assert.assertEquals("act-1", OperationTokenUtil.loadActivityIdFromOperationToken(encoded)); + } + + @Test + public void workflowRunTokenBytesByteIdenticalSnapshot() throws JsonProcessingException { + String encoded = OperationTokenUtil.generateWorkflowRunOperationToken("wf-1", "ns"); + // The encoded token must remain byte-identical for cross-SDK compatibility. + // Snapshot captured from the current (pre-aid) SDK output. + Assert.assertEquals("eyJ0IjoxLCJucyI6Im5zIiwid2lkIjoid2YtMSJ9", encoded); + } + + @Test + public void malformedActivityTokenRejected() { + String malformed = "{\"t\":4,\"ns\":\"ns\",\"aid\":\"\"}"; + IllegalArgumentException ex = + Assert.assertThrows( + IllegalArgumentException.class, + () -> + OperationTokenUtil.loadOperationToken( + encoder.encodeToString(malformed.getBytes()))); + Assert.assertTrue( + "expected 'missing activity ID' in message but got: " + ex.getMessage(), + ex.getMessage().contains("missing activity ID")); + } + + @Test + public void unknownOperationTokenTypeRejected() { + String unknown = "{\"t\":7,\"ns\":\"ns\",\"wid\":\"x\"}"; + IllegalArgumentException ex = + Assert.assertThrows( + IllegalArgumentException.class, + () -> + OperationTokenUtil.loadOperationToken(encoder.encodeToString(unknown.getBytes()))); + Assert.assertTrue( + "expected 'unknown operation token type' in message but got: " + ex.getMessage(), + ex.getMessage().contains("unknown operation token type")); + } + + @Test + public void loadWorkflowRunRejectsActivityToken() throws JsonProcessingException { + String activityToken = + OperationTokenUtil.generateActivityExecutionOperationToken("act-1", "ns"); + IllegalArgumentException ex = + Assert.assertThrows( + IllegalArgumentException.class, + () -> OperationTokenUtil.loadWorkflowRunOperationToken(activityToken)); + Assert.assertTrue( + "expected 'incorrect operation token type' in message but got: " + ex.getMessage(), + ex.getMessage().contains("incorrect operation token type")); + } + @Test public void loadWorkflowIdFromBadOperationToken() { // Bad token, empty json diff --git a/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java b/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java new file mode 100644 index 0000000000..0e383d036f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java @@ -0,0 +1,247 @@ +package io.temporal.nexus; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.uber.m3.tally.NoopScope; +import io.nexusrpc.handler.HandlerException; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.StartActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.InternalNexusOperationContext; +import java.time.Duration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Pure unit tests for {@link TemporalNexusClientImpl#claimAsyncSlot()} semantics. These run without + * a Temporal server (no {@link io.temporal.testing.internal.SDKTestWorkflowRule}). + * + *

The {@code claimAsyncSlot} guard fires as the very first statement in both {@code + * startActivityImpl} and {@code invokeAndReturn}, so the second call always throws {@link + * HandlerException}({@link HandlerException.ErrorType#BAD_REQUEST}) regardless of whether the first + * call's downstream RPC succeeded. + */ +public class TemporalNexusClientImplTest { + + private static final String NAMESPACE = "test-namespace"; + private static final String TASK_QUEUE = "test-task-queue"; + private static final String ENDPOINT = "test-endpoint"; + + private TemporalNexusClientImpl client; + + @Before + public void setUp() { + // Mock WorkflowClient so constructor doesn't need a live server. + WorkflowClient workflowClient = mock(WorkflowClient.class); + WorkflowClientOptions clientOptions = mock(WorkflowClientOptions.class); + when(workflowClient.getOptions()).thenReturn(clientOptions); + when(clientOptions.getNamespace()).thenReturn(NAMESPACE); + + OperationContext operationContext = mock(OperationContext.class); + when(operationContext.getService()).thenReturn("TestService"); + when(operationContext.getOperation()).thenReturn("testOperation"); + + OperationStartDetails operationStartDetails = + OperationStartDetails.newBuilder() + .setCallbackUrl("http://localhost/callback") + .setRequestId("test-request-id") + .build(); + + client = new TemporalNexusClientImpl(workflowClient, operationContext, operationStartDetails); + + // Set up the thread-local nexus context required by NexusStartActivityHelper and + // NexusStartWorkflowHelper deep in the call stack. + InternalNexusOperationContext internalCtx = + new InternalNexusOperationContext( + NAMESPACE, TASK_QUEUE, ENDPOINT, new NoopScope(), workflowClient); + CurrentNexusOperationContext.set(internalCtx); + } + + @After + public void tearDown() { + CurrentNexusOperationContext.unset(); + } + + // ---------- Activity double-start ---------- + + @Test + public void doubleStartActivity_secondCallThrowsBadRequest() { + StartActivityOptions options = + StartActivityOptions.newBuilder() + .setId("act-1") + .setTaskQueue(TASK_QUEUE) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build(); + + // First call: claimAsyncSlot() succeeds (sets flag), RPC may throw — we don't care. + try { + client.startActivity(TestActivity.class, TestActivity::doSomething, options); + } catch (HandlerException e) { + // If a HandlerException leaks out of the first call it must NOT be BAD_REQUEST + // from claimAsyncSlot — that would mean the flag was already set before setUp. + Assert.assertNotEquals( + "First startActivity must not fail with BAD_REQUEST (claimAsyncSlot guard)", + HandlerException.ErrorType.BAD_REQUEST, + e.getErrorType()); + } catch (Exception ignored) { + // Any other exception from the RPC layer is expected; the important thing is + // claimAsyncSlot already ran and set asyncOperationStarted = true. + } + + // Second call: claimAsyncSlot() sees the flag and must throw BAD_REQUEST immediately. + HandlerException ex = + Assert.assertThrows( + HandlerException.class, + () -> + client.startActivity( + TestActivity.class, + TestActivity::doSomething, + StartActivityOptions.newBuilder() + .setId("act-2") + .setTaskQueue(TASK_QUEUE) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build())); + + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, ex.getErrorType()); + Assert.assertTrue( + "Message should contain 'Only one async operation'", + ex.getCause() instanceof IllegalStateException + && ex.getCause() + .getMessage() + .startsWith("Only one async operation can be started per operation handler")); + } + + // ---------- Workflow double-start ---------- + + @Test + public void doubleStartWorkflow_secondCallThrowsBadRequest() { + WorkflowOptions options = + WorkflowOptions.newBuilder().setWorkflowId("wf-1").setTaskQueue(TASK_QUEUE).build(); + + // First call: claimAsyncSlot() succeeds, downstream RPC may throw — we don't care. + try { + client.startWorkflow(BlockingWorkflow.class, BlockingWorkflow::execute, "input", options); + } catch (HandlerException e) { + Assert.assertNotEquals( + "First startWorkflow must not fail with BAD_REQUEST (claimAsyncSlot guard)", + HandlerException.ErrorType.BAD_REQUEST, + e.getErrorType()); + } catch (Exception ignored) { + // Expected: no live server. + } + + // Second call must throw BAD_REQUEST immediately. + HandlerException ex = + Assert.assertThrows( + HandlerException.class, + () -> + client.startWorkflow( + BlockingWorkflow.class, + BlockingWorkflow::execute, + "input", + WorkflowOptions.newBuilder() + .setWorkflowId("wf-2") + .setTaskQueue(TASK_QUEUE) + .build())); + + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, ex.getErrorType()); + Assert.assertTrue( + "Message should contain 'Only one async operation'", + ex.getCause() instanceof IllegalStateException + && ex.getCause() + .getMessage() + .startsWith("Only one async operation can be started per operation handler")); + } + + // ---------- Mixed: workflow then activity ---------- + + @Test + public void startWorkflowThenActivity_activityThrowsBadRequest() { + WorkflowOptions wfOptions = + WorkflowOptions.newBuilder().setWorkflowId("wf-mixed-1").setTaskQueue(TASK_QUEUE).build(); + + try { + client.startWorkflow(BlockingWorkflow.class, BlockingWorkflow::execute, "in", wfOptions); + } catch (HandlerException e) { + Assert.assertNotEquals(HandlerException.ErrorType.BAD_REQUEST, e.getErrorType()); + } catch (Exception ignored) { + } + + HandlerException ex = + Assert.assertThrows( + HandlerException.class, + () -> + client.startActivity( + TestActivity.class, + TestActivity::doSomething, + StartActivityOptions.newBuilder() + .setId("act-mixed-1") + .setTaskQueue(TASK_QUEUE) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build())); + + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, ex.getErrorType()); + } + + // ---------- Mixed: activity then workflow ---------- + + @Test + public void startActivityThenWorkflow_workflowThrowsBadRequest() { + StartActivityOptions actOptions = + StartActivityOptions.newBuilder() + .setId("act-mixed-2") + .setTaskQueue(TASK_QUEUE) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build(); + + try { + client.startActivity(TestActivity.class, TestActivity::doSomething, actOptions); + } catch (HandlerException e) { + Assert.assertNotEquals(HandlerException.ErrorType.BAD_REQUEST, e.getErrorType()); + } catch (Exception ignored) { + } + + HandlerException ex = + Assert.assertThrows( + HandlerException.class, + () -> + client.startWorkflow( + BlockingWorkflow.class, + BlockingWorkflow::execute, + "in", + WorkflowOptions.newBuilder() + .setWorkflowId("wf-mixed-2") + .setTaskQueue(TASK_QUEUE) + .build())); + + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, ex.getErrorType()); + } + + // ---------- Minimal stubs ---------- + + @io.temporal.activity.ActivityInterface + public interface TestActivity { + @io.temporal.activity.ActivityMethod + void doSomething(); + } + + @io.temporal.workflow.WorkflowInterface + public interface BlockingWorkflow { + @io.temporal.workflow.WorkflowMethod + String execute(String input); + } + + public static class BlockingWorkflowImpl implements BlockingWorkflow { + @Override + public String execute(String input) { + return input; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncActivityOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncActivityOperationTest.java new file mode 100644 index 0000000000..455ea9f3d6 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncActivityOperationTest.java @@ -0,0 +1,217 @@ +package io.temporal.workflow.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.handler.HandlerException; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.client.StartActivityOptions; +import io.temporal.client.WorkflowFailedException; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.NexusOperationFailure; +import io.temporal.internal.nexus.OperationToken; +import io.temporal.internal.nexus.OperationTokenType; +import io.temporal.internal.nexus.OperationTokenUtil; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class AsyncActivityOperationTest extends BaseNexusTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class, DoubleStartNexus.class) + .setActivityImplementations(new TestActivityImpl()) + .setNexusServiceImplementation( + new TestNexusServiceImpl(), new DoubleStartNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void testActivityOperationEndToEnd() { + // The in-process test-server does not implement the StartActivityExecution RPC; the + // standalone-activity Nexus path requires a real server. Unit-only token assertions stay + // active in testActivityOperationTokenRoundTrip below. + assumeTrue(SDKTestWorkflowRule.useExternalService); + // Combined (a) caller workflow receives "hello " + input + // (b) opToken loads as ACTIVITY_EXECUTION with aid="act-" + requestId + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute("world"); + Assert.assertEquals("hello world", result); + } + + @Test + public void testActivityOperationTokenRoundTrip() throws Exception { + // (c) generateActivityExecutionOperationToken then loadOperationToken round-trips equal + // aid + ns. + String token = OperationTokenUtil.generateActivityExecutionOperationToken("act-123", "test-ns"); + OperationToken loaded = OperationTokenUtil.loadOperationToken(token); + Assert.assertEquals(OperationTokenType.ACTIVITY_EXECUTION, loaded.getType()); + Assert.assertEquals("act-123", loaded.getActivityId()); + Assert.assertEquals("test-ns", loaded.getNamespace()); + Assert.assertNull(loaded.getWorkflowId()); + } + + @Test + public void testDoubleStartActivityThrows() { + // The first start RPC requires StartActivityExecution which is not implemented by the + // in-process test server; gate this on a real server so the guard at the second call can be + // observed. + assumeTrue(SDKTestWorkflowRule.useExternalService); + // (d) Calling startActivity twice in one handler invocation throws + // HandlerException(BAD_REQUEST). + TestDoubleStartWorkflow workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestDoubleStartWorkflow.class); + WorkflowFailedException e = + Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("anything")); + Assert.assertTrue(e.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusFailure = (NexusOperationFailure) e.getCause(); + Assert.assertTrue(nexusFailure.getCause() instanceof HandlerException); + HandlerException handlerException = (HandlerException) nexusFailure.getCause(); + Assert.assertTrue(handlerException.getCause() instanceof ApplicationFailure); + ApplicationFailure appFailure = (ApplicationFailure) handlerException.getCause(); + Assert.assertEquals("java.lang.IllegalStateException", appFailure.getType()); + Assert.assertTrue( + appFailure + .getOriginalMessage() + .startsWith("Only one async operation can be started per operation handler")); + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + String process(String input); + } + + public static class TestActivityImpl implements TestActivity { + @Override + public String process(String input) { + return "hello " + input; + } + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(20)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + TestNexusServices.TestNexusService1 serviceStub = + Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); + + // Drive a handle so we can inspect the operation token for assertion (b). + NexusOperationHandle handle = + Workflow.startNexusOperation(serviceStub::operation, input); + NexusOperationExecution exec = handle.getExecution().get(); + Assert.assertTrue("Operation token should be present", exec.getOperationToken().isPresent()); + OperationToken token = OperationTokenUtil.loadOperationToken(exec.getOperationToken().get()); + Assert.assertEquals(OperationTokenType.ACTIVITY_EXECUTION, token.getType()); + Assert.assertTrue( + "activityId should start with 'act-' (got " + token.getActivityId() + ")", + token.getActivityId() != null && token.getActivityId().startsWith("act-")); + + return handle.getResult().get(); + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> + client.startActivity( + TestActivity.class, + TestActivity::process, + input, + StartActivityOptions.newBuilder() + .setId("act-" + context.getRequestId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build())); + } + } + + // ---------- Double-start test scaffolding ---------- + + public static class DoubleStartNexus implements TestDoubleStartWorkflow { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + DoubleStartNexusService stub = + Workflow.newNexusServiceStub(DoubleStartNexusService.class, serviceOptions); + return stub.operation(input); + } + } + + @WorkflowInterface + public interface TestDoubleStartWorkflow { + @WorkflowMethod + String execute(String input); + } + + @io.nexusrpc.Service + public interface DoubleStartNexusService { + @io.nexusrpc.Operation + String operation(String input); + } + + @ServiceImpl(service = DoubleStartNexusService.class) + public class DoubleStartNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> { + // First start should succeed. + client.startActivity( + TestActivity.class, + TestActivity::process, + input, + StartActivityOptions.newBuilder() + .setId("act-first-" + context.getRequestId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build()); + // Second start must throw HandlerException(BAD_REQUEST). + return client.startActivity( + TestActivity.class, + TestActivity::process, + input, + StartActivityOptions.newBuilder() + .setId("act-second-" + context.getRequestId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build()); + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelActivityAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelActivityAsyncOperationTest.java new file mode 100644 index 0000000000..45a2b78794 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelActivityAsyncOperationTest.java @@ -0,0 +1,264 @@ +package io.temporal.workflow.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.client.ActivityCompletionException; +import io.temporal.client.StartActivityOptions; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowStub; +import io.temporal.common.RetryOptions; +import io.temporal.failure.CanceledFailure; +import io.temporal.failure.NexusOperationFailure; +import io.temporal.nexus.CancelActivityExecutionInput; +import io.temporal.nexus.TemporalOperationCancelContext; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class CancelActivityAsyncOperationTest extends BaseNexusTest { + + static final AtomicBoolean cancelled = new AtomicBoolean(false); + static final AtomicBoolean customCancelInvoked = new AtomicBoolean(false); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(DefaultCancelNexus.class, OverriddenCancelNexus.class) + .setActivityImplementations(new HeartbeatingActivityImpl()) + .setNexusServiceImplementation( + new DefaultCancelNexusServiceImpl(), new OverriddenCancelNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Before + public void resetState() { + cancelled.set(false); + customCancelInvoked.set(false); + } + + @Test(timeout = 60_000) + public void testDefaultActivityCancel() { + // Standalone-activity Nexus path requires a real Temporal server; the in-process test server + // does not implement StartActivityExecution. + assumeTrue(SDKTestWorkflowRule.useExternalService); + WorkflowStub stub = + testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("DefaultCancelCaller"); + stub.start(testWorkflowRule.getTaskQueue()); + // Either succeeds with "ok" or throws a workflow failure; we only care about the cancel path. + try { + stub.getResult(String.class); + } catch (WorkflowFailedException ignored) { + // Acceptable: caller workflow may surface the cancel as failure. + } + // History contains EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED on the caller workflow. + testWorkflowRule.assertHistoryEvent( + stub.getExecution().getWorkflowId(), + io.temporal.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED); + // Activity worker observed cancel. + Assert.assertTrue("activity should have observed cancel", cancelled.get()); + } + + @Test(timeout = 60_000) + public void testOverriddenActivityCancel() { + // Same constraint as testDefaultActivityCancel. + assumeTrue(SDKTestWorkflowRule.useExternalService); + WorkflowStub stub = + testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("OverriddenCancelCaller"); + stub.start(testWorkflowRule.getTaskQueue()); + try { + stub.getResult(String.class); + } catch (WorkflowFailedException ignored) { + // Acceptable. + } + // The overriding handler should have been called. + Assert.assertTrue("override should have been invoked", customCancelInvoked.get()); + // Default cancel was not invoked: activity ran until startToCloseTimeout fired + // (no ActivityCompletionException raised on the worker). + Assert.assertFalse("default cancel should not have been invoked", cancelled.get()); + } + + @ActivityInterface + public interface HeartbeatingActivity { + @ActivityMethod + String process(String input); + } + + public static class HeartbeatingActivityImpl implements HeartbeatingActivity { + @Override + public String process(String input) { + while (true) { + try { + Activity.getExecutionContext().heartbeat(null); + } catch (ActivityCompletionException ex) { + cancelled.set(true); + throw ex; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + return "done"; + } + } + + @WorkflowInterface + public interface DefaultCancelCaller { + @WorkflowMethod + String execute(String taskQueue); + } + + @WorkflowInterface + public interface OverriddenCancelCaller { + @WorkflowMethod + String execute(String taskQueue); + } + + @io.nexusrpc.Service + public interface DefaultCancelNexusService { + @io.nexusrpc.Operation + String operation(String taskQueue); + } + + @io.nexusrpc.Service + public interface OverrideCancelNexusService { + @io.nexusrpc.Operation + String operation(String taskQueue); + } + + // ---- Default cancel ---- + + public static class DefaultCancelNexus implements DefaultCancelCaller { + @Override + public String execute(String taskQueue) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(15)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + DefaultCancelNexusService stub = + Workflow.newNexusServiceStub(DefaultCancelNexusService.class, serviceOptions); + try { + Workflow.newCancellationScope( + () -> { + NexusOperationHandle handle = + Workflow.startNexusOperation(stub::operation, taskQueue); + handle.getExecution().get(); + CancellationScope.current().cancel(); + handle.getResult().get(); + }) + .run(); + } catch (NexusOperationFailure failure) { + if (!(failure.getCause() instanceof CanceledFailure)) { + throw failure; + } + } + return "ok"; + } + } + + @ServiceImpl(service = DefaultCancelNexusService.class) + public class DefaultCancelNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> + client.startActivity( + HeartbeatingActivity.class, + HeartbeatingActivity::process, + input, + StartActivityOptions.newBuilder() + .setId("act-" + context.getRequestId()) + .setTaskQueue(input) + .setStartToCloseTimeout(Duration.ofSeconds(60)) + .setHeartbeatTimeout(Duration.ofSeconds(2)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build())); + } + } + + // ---- Overridden cancel ---- + + public static class OverriddenCancelNexus implements OverriddenCancelCaller { + @Override + public String execute(String taskQueue) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(15)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + OverrideCancelNexusService stub = + Workflow.newNexusServiceStub(OverrideCancelNexusService.class, serviceOptions); + try { + Workflow.newCancellationScope( + () -> { + NexusOperationHandle handle = + Workflow.startNexusOperation(stub::operation, taskQueue); + handle.getExecution().get(); + CancellationScope.current().cancel(); + handle.getResult().get(); + }) + .run(); + } catch (NexusOperationFailure failure) { + if (!(failure.getCause() instanceof CanceledFailure)) { + throw failure; + } + } + return "ok"; + } + } + + @ServiceImpl(service = OverrideCancelNexusService.class) + public class OverriddenCancelNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return new TemporalOperationHandler( + (context, client, input) -> + client.startActivity( + HeartbeatingActivity.class, + HeartbeatingActivity::process, + input, + StartActivityOptions.newBuilder() + .setId("act-override-" + context.getRequestId()) + .setTaskQueue(input) + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .setHeartbeatTimeout(Duration.ofSeconds(2)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build())) { + @Override + protected void cancelActivityExecution( + TemporalOperationCancelContext ctx, CancelActivityExecutionInput input) { + customCancelInvoked.set(true); + // Intentionally do NOT invoke default cancel; the activity will self-terminate via + // startToCloseTimeout. + } + }; + } + } +} From aa093f139c715418ecf1b6a9bf0c3c08579f54b6 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 16 Jun 2026 10:18:28 -0700 Subject: [PATCH 2/2] some clean up --- .../internal/nexus/OperationToken.java | 28 +++++++++++++++ .../internal/nexus/OperationTokenUtil.java | 25 ++++++++++++-- .../nexus/CancelActivityExecutionInput.java | 17 +++++++++- .../nexus/TemporalNexusClientImpl.java | 34 ++++++++++++++----- .../nexus/TemporalOperationHandler.java | 5 +-- .../internal/nexus/WorkflowRunTokenTest.java | 24 +++++++++++++ .../nexus/TemporalNexusClientImplTest.java | 21 ++++++------ 7 files changed, 131 insertions(+), 23 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java index 3593138a3e..a5f7ec9972 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java @@ -24,17 +24,23 @@ public class OperationToken { @JsonInclude(JsonInclude.Include.NON_NULL) private final String activityId; + @JsonProperty("rid") + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String runId; + @JsonCreator public OperationToken( @JsonProperty("t") Integer type, @JsonProperty("ns") String namespace, @JsonProperty("wid") String workflowId, @JsonProperty("aid") String activityId, + @JsonProperty("rid") String runId, @JsonProperty("v") Integer version) { this.type = OperationTokenType.fromValue(type); this.namespace = namespace; this.workflowId = workflowId; this.activityId = activityId; + this.runId = runId; this.version = version; } @@ -43,15 +49,26 @@ public OperationToken(OperationTokenType type, String namespace, String workflow this.namespace = namespace; this.workflowId = workflowId; this.activityId = null; + this.runId = null; this.version = null; } public OperationToken( OperationTokenType type, String namespace, String workflowId, String activityId) { + this(type, namespace, workflowId, activityId, null); + } + + public OperationToken( + OperationTokenType type, + String namespace, + String workflowId, + String activityId, + String runId) { this.type = type; this.namespace = namespace; this.workflowId = workflowId; this.activityId = activityId; + this.runId = runId; this.version = null; } @@ -74,4 +91,15 @@ public String getWorkflowId() { public String getActivityId() { return activityId; } + + /** + * Returns the activity run ID embedded in the token, or {@code null} if absent. + * + *

Run ID is only present on activity-execution tokens that were generated AFTER the start + * activity RPC completed (so the run ID was known). Tokens written into the Nexus operation-token + * callback header are generated before that point and therefore do not carry a run ID. + */ + public String getRunId() { + return runId; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java index b2654a4f62..447eb04f19 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java @@ -107,12 +107,33 @@ public static String generateWorkflowRunOperationToken(String workflowId, String return encoder.encodeToString(json.getBytes()); } - /** Generate an activity execution operation token from an activity ID and namespace. */ + /** + * Generate an activity execution operation token from an activity ID and namespace. + * + *

This overload omits the run ID. Use it when writing the token into the Nexus operation-token + * callback header — that token is generated before the start RPC completes, so the run ID is not + * yet known. + */ public static String generateActivityExecutionOperationToken(String activityId, String namespace) throws JsonProcessingException { + return generateActivityExecutionOperationToken(activityId, null, namespace); + } + + /** + * Generate an activity execution operation token from an activity ID, run ID, and namespace. The + * {@code runId} is included only when non-null. + * + *

This overload is used for the operation token returned to the Nexus caller from a start + * operation — at that point the start RPC has completed and the run ID is known. The header token + * written into the activity completion callback must NOT carry a run ID; use {@link + * #generateActivityExecutionOperationToken(String, String)} for that path. + */ + public static String generateActivityExecutionOperationToken( + String activityId, String runId, String namespace) throws JsonProcessingException { String json = ow.writeValueAsString( - new OperationToken(OperationTokenType.ACTIVITY_EXECUTION, namespace, null, activityId)); + new OperationToken( + OperationTokenType.ACTIVITY_EXECUTION, namespace, null, activityId, runId)); return encoder.encodeToString(json.getBytes()); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java b/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java index 511f9fc012..6794033817 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java @@ -2,6 +2,7 @@ import io.temporal.common.Experimental; import java.util.Objects; +import javax.annotation.Nullable; /** * Input to {@link TemporalOperationHandler#cancelActivityExecution} describing the activity @@ -11,13 +12,27 @@ public final class CancelActivityExecutionInput { private final String activityId; + private final @Nullable String runId; - public CancelActivityExecutionInput(String activityId) { + public CancelActivityExecutionInput(String activityId, @Nullable String runId) { this.activityId = Objects.requireNonNull(activityId); + this.runId = runId; } /** Returns the activity ID extracted from the operation token. */ public String getActivityId() { return activityId; } + + /** + * Returns the activity run ID extracted from the operation token, or {@code null} if absent. + * + *

Run ID is only present on operation tokens that were generated by this SDK AFTER the start + * activity RPC completed. Tokens originating from the activity completion callback header do not + * carry a run ID. + */ + @Nullable + public String getRunId() { + return runId; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java index 8e84883381..27ff11d205 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java @@ -1,5 +1,6 @@ package io.temporal.nexus; +import com.fasterxml.jackson.core.JsonProcessingException; import io.nexusrpc.handler.HandlerException; import io.nexusrpc.handler.OperationContext; import io.nexusrpc.handler.OperationStartDetails; @@ -17,6 +18,7 @@ import io.temporal.internal.client.NexusStartWorkflowResponse; import io.temporal.internal.nexus.NexusStartActivityHelper; import io.temporal.internal.nexus.NexusStartWorkflowHelper; +import io.temporal.internal.nexus.OperationTokenUtil; import io.temporal.internal.util.MethodExtractor; import io.temporal.workflow.Functions; import java.lang.reflect.Method; @@ -257,7 +259,7 @@ public TemporalOperationResult startWorkflow( } private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { - claimAsyncSlot(); + markAsyncOperationStarted(); try { NexusStartWorkflowResponse response = NexusStartWorkflowHelper.startWorkflowAndAttachLinks( @@ -436,7 +438,7 @@ public TemporalOperationResult startActivity( private TemporalOperationResult startActivityImpl( String activityType, List args, StartActivityOptions options) { - claimAsyncSlot(); + markAsyncOperationStarted(); try { NexusStartActivityResponse response = NexusStartActivityHelper.startActivityAndAttachLinks( @@ -473,18 +475,34 @@ private TemporalOperationResult startActivityImpl( .build()); ActivityClientCallsInterceptor.StartActivityOutput out = ((ActivityClientInternal) activityClient).getInvoker().startActivity(input); - // The invoker generated and injected the token into the callback headers before - // the start RPC; read it back from the output instead of regenerating it. - String token = out.getNexusOperationToken(); - if (token == null) { + // The invoker generated and injected the runId-free token into the callback + // headers before the start RPC fired. The header token cannot include a run ID + // because the run ID isn't known until after the start RPC returns. The operation + // token returned to the Nexus caller can — and should — include it, so it's + // regenerated here from the same activity ID + the run ID the start RPC produced. + String headerToken = out.getNexusOperationToken(); + if (headerToken == null) { throw new HandlerException( HandlerException.ErrorType.INTERNAL, "invoker did not return a Nexus operation token for activity start with callback", new IllegalStateException( "nexusOperationToken is null on StartActivityOutput when CompletionCallback was set")); } + String returnToken; + try { + returnToken = + OperationTokenUtil.generateActivityExecutionOperationToken( + out.getActivityId(), + out.getActivityRunId(), + client.getOptions().getNamespace()); + } catch (JsonProcessingException e) { + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + "failed to generate activity operation token", + e); + } return new NexusStartActivityResponse( - out.getActivityId(), out.getActivityRunId(), token); + out.getActivityId(), out.getActivityRunId(), returnToken); }); return TemporalOperationResult.async(response.getOperationToken()); } catch (Throwable t) { @@ -495,7 +513,7 @@ private TemporalOperationResult startActivityImpl( } } - private void claimAsyncSlot() { + private void markAsyncOperationStarted() { if (!asyncOperationStarted.compareAndSet(false, true)) { throw new HandlerException( HandlerException.ErrorType.BAD_REQUEST, diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java index 1c7e57f402..584d702229 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -110,7 +110,8 @@ public final void cancel(OperationContext ctx, OperationCancelDetails details) { break; case ACTIVITY_EXECUTION: cancelActivityExecution( - cancelContext, new CancelActivityExecutionInput(token.getActivityId())); + cancelContext, + new CancelActivityExecutionInput(token.getActivityId(), token.getRunId())); break; default: throw new HandlerException( @@ -154,6 +155,6 @@ protected void cancelActivityExecution( .setDataConverter(wc.getOptions().getDataConverter()) .setIdentity(wc.getOptions().getIdentity()) .build()); - ac.getHandle(input.getActivityId(), null).cancel(); + ac.getHandle(input.getActivityId(), input.getRunId()).cancel(); } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java index 84caf9e483..ad55f7a3ab 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java @@ -100,12 +100,36 @@ public void roundTripActivityExecutionToken() throws JsonProcessingException { Assert.assertEquals("act-1", token.getActivityId()); Assert.assertEquals("ns", token.getNamespace()); Assert.assertNull(token.getWorkflowId()); + Assert.assertNull(token.getRunId()); Assert.assertNull(token.getVersion()); // Also exercise the symmetric activityId loader. Assert.assertEquals("act-1", OperationTokenUtil.loadActivityIdFromOperationToken(encoded)); } + @Test + public void roundTripActivityExecutionTokenWithRunId() throws JsonProcessingException { + String encoded = + OperationTokenUtil.generateActivityExecutionOperationToken("act-1", "run-1", "ns"); + OperationToken token = OperationTokenUtil.loadOperationToken(encoded); + Assert.assertEquals(OperationTokenType.ACTIVITY_EXECUTION, token.getType()); + Assert.assertEquals("act-1", token.getActivityId()); + Assert.assertEquals("run-1", token.getRunId()); + Assert.assertEquals("ns", token.getNamespace()); + Assert.assertNull(token.getWorkflowId()); + Assert.assertNull(token.getVersion()); + } + + @Test + public void activityExecutionTokenOmitsRunIdWhenNull() throws JsonProcessingException { + // The header-token path passes runId=null and must produce a payload byte-identical to the + // two-arg overload — the runId-aware overload is the *only* one used in code now. + String withRunId = + OperationTokenUtil.generateActivityExecutionOperationToken("act-1", null, "ns"); + String withoutRunId = OperationTokenUtil.generateActivityExecutionOperationToken("act-1", "ns"); + Assert.assertEquals(withoutRunId, withRunId); + } + @Test public void workflowRunTokenBytesByteIdenticalSnapshot() throws JsonProcessingException { String encoded = OperationTokenUtil.generateWorkflowRunOperationToken("wf-1", "ns"); diff --git a/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java b/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java index 0e383d036f..020124f6ff 100644 --- a/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java @@ -20,10 +20,10 @@ import org.junit.Test; /** - * Pure unit tests for {@link TemporalNexusClientImpl#claimAsyncSlot()} semantics. These run without - * a Temporal server (no {@link io.temporal.testing.internal.SDKTestWorkflowRule}). + * Pure unit tests for {@link TemporalNexusClientImpl#markAsyncOperationStarted()} semantics. These + * run without a Temporal server (no {@link io.temporal.testing.internal.SDKTestWorkflowRule}). * - *

The {@code claimAsyncSlot} guard fires as the very first statement in both {@code + *

The {@code markAsyncOperationStarted} guard fires as the very first statement in both {@code * startActivityImpl} and {@code invokeAndReturn}, so the second call always throws {@link * HandlerException}({@link HandlerException.ErrorType#BAD_REQUEST}) regardless of whether the first * call's downstream RPC succeeded. @@ -80,22 +80,23 @@ public void doubleStartActivity_secondCallThrowsBadRequest() { .setStartToCloseTimeout(Duration.ofSeconds(10)) .build(); - // First call: claimAsyncSlot() succeeds (sets flag), RPC may throw — we don't care. + // First call: markAsyncOperationStarted() succeeds (sets flag), RPC may throw — we don't care. try { client.startActivity(TestActivity.class, TestActivity::doSomething, options); } catch (HandlerException e) { // If a HandlerException leaks out of the first call it must NOT be BAD_REQUEST - // from claimAsyncSlot — that would mean the flag was already set before setUp. + // from markAsyncOperationStarted — that would mean the flag was already set before setUp. Assert.assertNotEquals( - "First startActivity must not fail with BAD_REQUEST (claimAsyncSlot guard)", + "First startActivity must not fail with BAD_REQUEST (markAsyncOperationStarted guard)", HandlerException.ErrorType.BAD_REQUEST, e.getErrorType()); } catch (Exception ignored) { // Any other exception from the RPC layer is expected; the important thing is - // claimAsyncSlot already ran and set asyncOperationStarted = true. + // markAsyncOperationStarted already ran and set asyncOperationStarted = true. } - // Second call: claimAsyncSlot() sees the flag and must throw BAD_REQUEST immediately. + // Second call: markAsyncOperationStarted() sees the flag and must throw BAD_REQUEST + // immediately. HandlerException ex = Assert.assertThrows( HandlerException.class, @@ -125,12 +126,12 @@ public void doubleStartWorkflow_secondCallThrowsBadRequest() { WorkflowOptions options = WorkflowOptions.newBuilder().setWorkflowId("wf-1").setTaskQueue(TASK_QUEUE).build(); - // First call: claimAsyncSlot() succeeds, downstream RPC may throw — we don't care. + // First call: markAsyncOperationStarted() succeeds, downstream RPC may throw — we don't care. try { client.startWorkflow(BlockingWorkflow.class, BlockingWorkflow::execute, "input", options); } catch (HandlerException e) { Assert.assertNotEquals( - "First startWorkflow must not fail with BAD_REQUEST (claimAsyncSlot guard)", + "First startWorkflow must not fail with BAD_REQUEST (markAsyncOperationStarted guard)", HandlerException.ErrorType.BAD_REQUEST, e.getErrorType()); } catch (Exception ignored) {