diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fea940f214..7bf551c735 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,7 @@ jobs: - name: Start CLI server env: - TEMPORAL_CLI_VERSION: 1.7.2-standalone-nexus-operations + TEMPORAL_CLI_VERSION: 1.7.1-standalone-nexus-operations run: | wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz tar -xzf temporal_cli.tar.gz @@ -112,13 +112,11 @@ jobs: --dynamic-config-value frontend.ListWorkersEnabled=true \ --dynamic-config-value frontend.enableCancelWorkerPollsOnShutdown=true \ --dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \ - --dynamic-config-value 'callback.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \ --dynamic-config-value frontend.activityAPIsEnabled=true \ --dynamic-config-value activity.enableStandalone=true \ --dynamic-config-value activity.startDelayEnabled=true \ --dynamic-config-value nexusoperation.enableStandalone=true \ --dynamic-config-value history.enableChasm=true \ - --dynamic-config-value history.enableCHASMSignalBacklinks=true \ --dynamic-config-value history.enableTransitionHistory=true & sleep 10s diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 510471f2e6..896fdc0762 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -102,13 +102,6 @@ public WorkflowStartOutput start(WorkflowStartInput input) { e); } } - // If this start is being issued from inside a Nexus operation handler, stash only the - // forward operation->workflow link from the start response so NexusStartWorkflowHelper can - // attach it to the WorkflowExecutionStarted event. Unlike signal/signalWithStart, start - // deliberately does NOT add a response link here: the operation->workflow relationship is - // already captured by the forward link, so re-adding response.getLink() as a response link - // would duplicate it on the caller's history event. Do not "restore symmetry" by calling - // addResponseLink here. if (CurrentNexusOperationContext.isNexusContext()) { CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink()); } @@ -127,13 +120,6 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { .setRequestId(UUID.randomUUID().toString()) .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); - // If this signal is being issued from inside a Nexus operation handler, forward the inbound - // Nexus task links so the SignalWorkflowExecution history event links back to the caller. - boolean inNexusContext = CurrentNexusOperationContext.isNexusContext(); - if (inNexusContext) { - request.addAllLinks(CurrentNexusOperationContext.get().getRequestLinks()); - } - DataConverter dataConverterWitSignalContext = clientOptions .getDataConverter() @@ -143,12 +129,7 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { Optional inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments()); inputArgs.ifPresent(request::setInput); - SignalWorkflowExecutionResponse response = genericClient.signal(request.build()); - // Server >=1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the signal - // event; older servers leave it unset. Propagate when present. - if (inNexusContext && response.hasLink()) { - CurrentNexusOperationContext.get().addResponseLink(response.getLink()); - } + genericClient.signal(request.build()); return new WorkflowSignalOutput(); } @@ -167,28 +148,17 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu Optional signalInput = dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments()); - SignalWithStartWorkflowExecutionRequest.Builder requestBuilder = - requestsHelper.newSignalWithStartWorkflowExecutionRequest( - startRequest, input.getSignalName(), signalInput.orElse(null)); - // If this signalWithStart is being issued from inside a Nexus operation handler, forward - // the inbound Nexus task links so both the WorkflowExecutionStarted and - // WorkflowExecutionSignaled events on the callee link back to the caller. - boolean inNexusContext = CurrentNexusOperationContext.isNexusContext(); - if (inNexusContext) { - requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getRequestLinks()); - } - SignalWithStartWorkflowExecutionRequest request = requestBuilder.build(); + SignalWithStartWorkflowExecutionRequest request = + requestsHelper + .newSignalWithStartWorkflowExecutionRequest( + startRequest, input.getSignalName(), signalInput.orElse(null)) + .build(); SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request); WorkflowExecution execution = WorkflowExecution.newBuilder() .setRunId(response.getRunId()) .setWorkflowId(request.getWorkflowId()) .build(); - // Server >=1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the signal - // event; older servers leave it unset. Propagate when present. - if (inNexusContext && response.hasSignalLink()) { - CurrentNexusOperationContext.get().addResponseLink(response.getSignalLink()); - } // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask. // We should wire it when it's implemented server-side. return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution)); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java index 23932104fe..a81fa253a0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java @@ -10,7 +10,7 @@ public interface GenericWorkflowClient { StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request); - SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request); + void signal(SignalWorkflowExecutionRequest request); SignalWithStartWorkflowExecutionResponse signalWithStart( SignalWithStartWorkflowExecutionRequest request); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java index a40e66bc4a..f74d1b6e37 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java @@ -61,13 +61,13 @@ private static Map tagsForStartWorkflow(StartWorkflowExecutionRe } @Override - public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) { + public void signal(SignalWorkflowExecutionRequest request) { Map tags = new ImmutableMap.Builder(1) .put(MetricsTag.SIGNAL_NAME, request.getSignalName()) .build(); Scope scope = metricsScope.tagged(tags); - return grpcRetryer.retryWithResult( + grpcRetryer.retry( () -> service .blockingStub() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java index 0308683857..d7306ea968 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java @@ -6,10 +6,6 @@ import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; import io.temporal.nexus.NexusOperationContext; import io.temporal.nexus.NexusOperationInfo; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nonnull; public class InternalNexusOperationContext { private final String namespace; @@ -18,25 +14,7 @@ public class InternalNexusOperationContext { private final Scope metricScope; private final WorkflowClient client; NexusOperationOutboundCallsInterceptor outboundCalls; - // Link returned by the StartWorkflowExecution response when the operation is backed by a workflow - // (workflow-run operations). Read by NexusStartWorkflowHelper to attach the forward - // operation->workflow link, fabricating a WORKFLOW_EXECUTION_STARTED link when the server omits - // one. Distinct from the response links below. Link startWorkflowResponseLink; - // Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the - // workflow client can attach them to the outgoing requests it issues (e.g. signal, - // signalWithStart) via the request's links field. - private List requestLinks = Collections.emptyList(); - // Links returned by outbound RPCs the operation handler issues (such as - // SignalWorkflowExecutionResponse.link or SignalWithStartWorkflowExecutionResponse.signal_link). - // One entry per outbound RPC that returned a link. Drained - // by the task handler when building StartOperationResponse so each RPC the handler issued gets a - // corresponding link on the caller workflow's history event. - // - // A handler may issue RPCs from multiple threads, so every read and write of this list is guarded - // by responseLinksLock and getResponseLinks() returns a defensive copy taken under the lock. - private final Object responseLinksLock = new Object(); - private final List responseLinks = new ArrayList<>(); public InternalNexusOperationContext( String namespace, @@ -82,19 +60,6 @@ public NexusOperationContext getUserFacingContext() { return new NexusOperationContextImpl(); } - /** - * Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached - * to RPCs issued by the operation handler. - */ - public void setRequestLinks(List links) { - this.requestLinks = links == null ? Collections.emptyList() : links; - } - - /** Links from the inbound Nexus task; empty if none. */ - public @Nonnull List getRequestLinks() { - return Collections.unmodifiableList(requestLinks); - } - public void setStartWorkflowResponseLink(Link link) { this.startWorkflowResponseLink = link; } @@ -103,32 +68,6 @@ public Link getStartWorkflowResponseLink() { return startWorkflowResponseLink; } - /** - * Append a response link returned by an outbound RPC the operation handler issued (e.g. signal, - * signalWithStart, etc). The task handler drains the list when building the operation's - * StartOperationResponse. - */ - public void addResponseLink(Link link) { - if (link != null) { - synchronized (responseLinksLock) { - responseLinks.add(link); - } - } - } - - /** - * Response links from every outbound RPC the handler issued. Returned as an unmodifiable view; - * callers must not attempt to mutate. Entries are accumulated while the operation handler runs - * (the call that flows through {@link - * io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor#startOperation}) and are - * drained afterward by the task handler when building the StartOperationResponse. - */ - public @Nonnull List getResponseLinks() { - synchronized (responseLinksLock) { - return Collections.unmodifiableList(new ArrayList<>(responseLinks)); - } - } - private class NexusOperationContextImpl implements NexusOperationContext { @Override public NexusOperationInfo getInfo() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 0fac5263a9..7f10ba8c62 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -20,7 +20,6 @@ import io.temporal.failure.CanceledFailure; import io.temporal.failure.TemporalFailure; import io.temporal.internal.common.InternalUtils; -import io.temporal.internal.common.LinkConverter; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; @@ -285,10 +284,6 @@ private StartOperationResponse handleStartOperation( .setCallbackUrl(task.getCallback()) .setRequestId(task.getRequestId()); task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader); - // Stash the inbound links in common.v1.Link form on the operation context so the RPCs the - // handler issues (e.g. signal, signalWithStart, etc) can attach them to their - // request's links field. - List inboundCommonLinks = new ArrayList<>(); task.getLinksList() .forEach( link -> { @@ -301,23 +296,7 @@ private StartOperationResponse handleStartOperation( "Invalid link URL: " + link.getUrl(), e); } - // LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of - // other shapes (e.g. non-temporal URLs) come back null and are intentionally not - // forwarded onto the RPCs the handler issues, which require the WorkflowEvent - // variant. Log so a debugging session can see what was dropped. - io.temporal.api.common.v1.Link commonLink = - LinkConverter.nexusLinkToWorkflowEvent(link); - if (commonLink != null) { - inboundCommonLinks.add(commonLink); - } else { - log.warn( - "Dropping inbound Nexus link from outbound link propagation: type='{}'," - + " url='{}' (not a parseable temporal WorkflowEvent link)", - link.getType(), - link.getUrl()); - } }); - CurrentNexusOperationContext.get().setRequestLinks(inboundCommonLinks); HandlerInputContent.Builder input = HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput()); @@ -328,28 +307,10 @@ private StartOperationResponse handleStartOperation( try { OperationStartResult result = startOperation(context, operationStartDetails.build(), input.build()); - // If any RPCs the handler issued (e.g. signal, signalWithStart, etc) returned - // response links, propagate them to the caller so the caller workflow's history event links - // to each event on the callee. Same set of response links applies to both sync and async - // response variants. - List responseLinks = new ArrayList<>(); - for (io.temporal.api.common.v1.Link responseLink : - CurrentNexusOperationContext.get().getResponseLinks()) { - if (!responseLink.hasWorkflowEvent()) { - continue; - } - io.temporal.api.nexus.v1.Link converted = - LinkConverter.workflowEventToNexusLink(responseLink.getWorkflowEvent()); - if (converted != null) { - responseLinks.add(converted); - } - } - if (result.isSync()) { startResponseBuilder.setSyncSuccess( StartOperationResponse.Sync.newBuilder() .setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes())) - .addAllLinks(responseLinks) .build()); } else { startResponseBuilder.setAsyncSuccess( @@ -365,7 +326,6 @@ private StartOperationResponse handleStartOperation( .setUrl(link.getUri().toString()) .build()) .collect(Collectors.toList())) - .addAllLinks(responseLinks) .build()); } } catch (OperationException e) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java b/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java deleted file mode 100644 index 85be43a8ce..0000000000 --- a/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java +++ /dev/null @@ -1,307 +0,0 @@ -package io.temporal.internal.client; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.uber.m3.tally.RootScopeBuilder; -import com.uber.m3.tally.Scope; -import io.temporal.api.common.v1.Link; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.enums.v1.EventType; -import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest; -import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse; -import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionRequest; -import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionResponse; -import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest; -import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowClientOptions; -import io.temporal.client.WorkflowOptions; -import io.temporal.common.interceptors.Header; -import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowSignalInput; -import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowSignalWithStartInput; -import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowStartInput; -import io.temporal.internal.client.external.GenericWorkflowClient; -import io.temporal.internal.nexus.CurrentNexusOperationContext; -import io.temporal.internal.nexus.InternalNexusOperationContext; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -/** - * Unit tests for {@link RootWorkflowClientInvoker#signal} link propagation in and out of the Nexus - * operation context. These run against mocked dependencies and exercise the code paths that the - * integration tests in {@code SignalOperationLinkingTest} can only cover when a real flag-enabled - * server is available. - */ -public class RootWorkflowClientInvokerLinkPropagationTest { - - private static final String NAMESPACE = "test-namespace"; - private static final String WORKFLOW_ID = "wf-target"; - - private GenericWorkflowClient genericClient; - private RootWorkflowClientInvoker invoker; - private InternalNexusOperationContext nexusCtx; - - @Before - public void setUp() { - genericClient = mock(GenericWorkflowClient.class); - invoker = - new RootWorkflowClientInvoker( - genericClient, - WorkflowClientOptions.newBuilder() - .setNamespace(NAMESPACE) - .validateAndBuildWithDefaults(), - new WorkerFactoryRegistry()); - Scope metricsScope = new RootScopeBuilder().reportEvery(com.uber.m3.util.Duration.ofMillis(10)); - nexusCtx = - new InternalNexusOperationContext( - NAMESPACE, "tq", "endpoint", metricsScope, mock(WorkflowClient.class)); - CurrentNexusOperationContext.set(nexusCtx); - } - - @After - public void tearDown() { - CurrentNexusOperationContext.unset(); - } - - /** - * Happy path against a flag-enabled server: inbound nexus links are forwarded onto the - * SignalWorkflowExecutionRequest, and the response link is captured back onto the operation - * context. - */ - @Test - public void signalForwardsInboundLinksAndCapturesResponseLink() { - Link inboundLink = - workflowEventLink( - "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); - nexusCtx.setRequestLinks(Collections.singletonList(inboundLink)); - - Link responseLink = - workflowEventLink( - WORKFLOW_ID, "target-run", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - SignalWorkflowExecutionResponse response = - SignalWorkflowExecutionResponse.newBuilder().setLink(responseLink).build(); - when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))).thenReturn(response); - - invoker.signal(newSignalInput()); - - // Forward direction: the request the SDK sent carries the inbound link. - ArgumentCaptor captor = - ArgumentCaptor.forClass(SignalWorkflowExecutionRequest.class); - org.mockito.Mockito.verify(genericClient).signal(captor.capture()); - SignalWorkflowExecutionRequest sent = captor.getValue(); - Assert.assertEquals("request should carry the single inbound link", 1, sent.getLinksCount()); - Assert.assertEquals(inboundLink, sent.getLinks(0)); - - // Backward direction: the response's link is now on the context for the task handler to read. - List captured = nexusCtx.getResponseLinks(); - Assert.assertEquals("expected one captured response link", 1, captured.size()); - Assert.assertEquals(responseLink, captured.get(0)); - } - - /** - * Older-server compatibility: the server returns a response without {@code link} set. The SDK - * must not crash and must leave the operation context's response link list empty. - */ - @Test - public void signalAgainstOlderServerCapturesNoResponseLink() { - Link inboundLink = - workflowEventLink( - "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); - nexusCtx.setRequestLinks(Collections.singletonList(inboundLink)); - - // Pre-1.31 server / flag-off server: response has no link. - SignalWorkflowExecutionResponse response = SignalWorkflowExecutionResponse.getDefaultInstance(); - when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))).thenReturn(response); - - invoker.signal(newSignalInput()); - - // Forward direction still works regardless of server version. - ArgumentCaptor captor = - ArgumentCaptor.forClass(SignalWorkflowExecutionRequest.class); - org.mockito.Mockito.verify(genericClient).signal(captor.capture()); - Assert.assertEquals(1, captor.getValue().getLinksCount()); - - // Backward direction: no response link captured because the server didn't send one. - Assert.assertTrue( - "expected no captured response link when server returned no link", - nexusCtx.getResponseLinks().isEmpty()); - } - - /** - * Multi-signal: two signal RPCs in a row each contribute a response link; both must be captured - * in order on the context, ready for the task handler to drain into the operation response. - */ - @Test - public void multipleSignalsAccumulateAllResponseLinks() { - Link firstResponseLink = - workflowEventLink("callee-a", "run-a", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - Link secondResponseLink = - workflowEventLink("callee-b", "run-b", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))) - .thenReturn(SignalWorkflowExecutionResponse.newBuilder().setLink(firstResponseLink).build()) - .thenReturn( - SignalWorkflowExecutionResponse.newBuilder().setLink(secondResponseLink).build()); - - invoker.signal(newSignalInput()); - invoker.signal(newSignalInput()); - - List captured = nexusCtx.getResponseLinks(); - Assert.assertEquals( - "expected one response link per signal call", - Arrays.asList(firstResponseLink, secondResponseLink), - captured); - } - - /** - * Happy-path mirror of {@link #signalForwardsInboundLinksAndCapturesResponseLink} but for {@code - * signalWithStart}. The forward direction must attach inbound links to {@link - * SignalWithStartWorkflowExecutionRequest#getLinksList}, and the backward direction must capture - * {@code response.signal_link} via the same response link path. Different proto field name - * ({@code signal_link} vs {@code link}) and different code path inside {@link - * io.temporal.internal.client.RootWorkflowClientInvoker#signalWithStart} — a regression in only - * one branch would otherwise pass the plain-signal tests. - */ - @Test - public void signalWithStartForwardsInboundLinksAndCapturesResponseLink() { - Link inboundLink = - workflowEventLink( - "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); - nexusCtx.setRequestLinks(Collections.singletonList(inboundLink)); - - Link responseLink = - workflowEventLink( - WORKFLOW_ID, "target-run", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - SignalWithStartWorkflowExecutionResponse response = - SignalWithStartWorkflowExecutionResponse.newBuilder() - .setRunId("target-run") - .setSignalLink(responseLink) - .build(); - when(genericClient.signalWithStart(any(SignalWithStartWorkflowExecutionRequest.class))) - .thenReturn(response); - - invoker.signalWithStart(newSignalWithStartInput()); - - // Forward direction: the SignalWithStartWorkflowExecutionRequest carries the inbound link. - ArgumentCaptor captor = - ArgumentCaptor.forClass(SignalWithStartWorkflowExecutionRequest.class); - org.mockito.Mockito.verify(genericClient).signalWithStart(captor.capture()); - SignalWithStartWorkflowExecutionRequest sent = captor.getValue(); - Assert.assertEquals("request should carry the single inbound link", 1, sent.getLinksCount()); - Assert.assertEquals(inboundLink, sent.getLinks(0)); - - // Backward direction: response.signal_link is on the context for the task handler to read. - List captured = nexusCtx.getResponseLinks(); - Assert.assertEquals("expected one captured response link", 1, captured.size()); - Assert.assertEquals(responseLink, captured.get(0)); - } - - /** - * Mixed-RPC accumulation: a handler that issues one signal and one signalWithStart against the - * same context must end up with both response links captured, in call order. Guards against - * regressions where one of the two code paths stops appending to the same list. - */ - @Test - public void mixedSignalAndSignalWithStartAccumulateAllResponseLinks() { - Link signalResponseLink = - workflowEventLink("callee-s", "run-s", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - Link signalWithStartResponseLink = - workflowEventLink( - "callee-sws", "run-sws", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))) - .thenReturn( - SignalWorkflowExecutionResponse.newBuilder().setLink(signalResponseLink).build()); - when(genericClient.signalWithStart(any(SignalWithStartWorkflowExecutionRequest.class))) - .thenReturn( - SignalWithStartWorkflowExecutionResponse.newBuilder() - .setRunId("run-sws") - .setSignalLink(signalWithStartResponseLink) - .build()); - - invoker.signal(newSignalInput()); - invoker.signalWithStart(newSignalWithStartInput()); - - Assert.assertEquals( - "expected one response link each from signal and signalWithStart, in call order", - Arrays.asList(signalResponseLink, signalWithStartResponseLink), - nexusCtx.getResponseLinks()); - } - - /** - * Post-rebase start contract: a plain {@code start()} issued from inside a Nexus operation - * handler captures only the FORWARD operation->workflow link (via {@code - * setStartWorkflowResponseLink}) and deliberately does NOT add a response link (unlike - * signal/signalWithStart). Replaces the two start-link tests removed by the rebase and guards - * against a regression that re-adds a response link on the start path. - */ - @Test - public void startSetsForwardLinkOnlyAndCapturesNoResponseLink() { - Link startResponseLink = - workflowEventLink( - WORKFLOW_ID, "target-run", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED); - StartWorkflowExecutionResponse response = - StartWorkflowExecutionResponse.newBuilder() - .setRunId("target-run") - .setLink(startResponseLink) - .build(); - when(genericClient.start(any(StartWorkflowExecutionRequest.class))).thenReturn(response); - - invoker.start(newStartInput()); - - // Forward direction: the start response link is stashed for NexusStartWorkflowHelper to read. - Assert.assertEquals( - "expected the forward start link on the context", - startResponseLink, - nexusCtx.getStartWorkflowResponseLink()); - - // Backward direction: start must not add a response link. - Assert.assertTrue( - "expected no response link captured on the start path", - nexusCtx.getResponseLinks().isEmpty()); - } - - // ── helpers ────────────────────────────────────────────────────────────────────────────── - - private static WorkflowSignalInput newSignalInput() { - return new WorkflowSignalInput( - WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).build(), - "test-signal", - Header.empty(), - new Object[] {"payload"}); - } - - private static WorkflowStartInput newStartInput() { - WorkflowOptions options = - WorkflowOptions.newBuilder().setTaskQueue("tq").setDisableEagerExecution(true).build(); - return new WorkflowStartInput( - WORKFLOW_ID, "TestWorkflow", Header.empty(), new Object[] {}, options); - } - - private static WorkflowSignalWithStartInput newSignalWithStartInput() { - WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue("tq").build(); - WorkflowStartInput startInput = - new WorkflowStartInput( - WORKFLOW_ID, "TestWorkflow", Header.empty(), new Object[] {}, options); - return new WorkflowSignalWithStartInput( - startInput, "test-signal", new Object[] {"signal-payload"}); - } - - private static Link workflowEventLink(String workflowId, String runId, EventType eventType) { - return Link.newBuilder() - .setWorkflowEvent( - Link.WorkflowEvent.newBuilder() - .setNamespace(NAMESPACE) - .setWorkflowId(workflowId) - .setRunId(runId) - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder().setEventType(eventType))) - .build(); - } -} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java index 8649cf7393..ad7c628e98 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java @@ -7,14 +7,10 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.Duration; import io.nexusrpc.Header; -import io.nexusrpc.OperationException; import io.nexusrpc.handler.*; -import io.temporal.api.common.v1.Link; import io.temporal.api.common.v1.Payload; -import io.temporal.api.enums.v1.EventType; import io.temporal.api.nexus.v1.Request; import io.temporal.api.nexus.v1.StartOperationRequest; -import io.temporal.api.nexus.v1.StartOperationResponse; import io.temporal.api.workflowservice.v1.PollNexusTaskQueueResponse; import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; @@ -161,220 +157,6 @@ public void startAsyncSyncOperation() throws TimeoutException { "test id", result.getResponse().getStartOperation().getAsyncSuccess().getOperationToken()); } - /** - * Verify that signal response links stashed on the {@link InternalNexusOperationContext} during a - * handler invocation are merged into the resulting {@code StartOperationResponse.Async} via - * {@link io.temporal.internal.common.LinkConverter}. No server required. - */ - @Test - public void asyncResponseIncludesSignalResponseLinks() throws TimeoutException { - WorkflowClient client = mock(WorkflowClient.class); - NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl( - client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); - nexusTaskHandlerImpl.registerNexusServiceImplementations( - new Object[] {new ResponseLinkStashingAsyncServiceImpl()}); - nexusTaskHandlerImpl.start(); - - PollNexusTaskQueueResponse.Builder task = - PollNexusTaskQueueResponse.newBuilder() - .setRequest( - Request.newBuilder() - .setStartOperation( - StartOperationRequest.newBuilder() - .setOperation("operation") - .setService("TestNexusService1") - .setPayload(dataConverter.toPayload("op-token").get()) - .build())); - - NexusTaskHandler.Result result = - nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); - - Assert.assertNull(result.getHandlerException()); - StartOperationResponse.Async async = result.getResponse().getStartOperation().getAsyncSuccess(); - Assert.assertEquals("op-token", async.getOperationToken()); - Assert.assertEquals( - "expected one signal response link on the async response", 1, async.getLinksCount()); - // The response link was stashed as a WorkflowEvent for callee workflowId "callee-wf"; the - // response should contain a temporal:// URL referencing that workflow. - Assert.assertTrue( - "expected response link URL to reference the callee workflow, got: " - + async.getLinks(0).getUrl(), - async.getLinks(0).getUrl().contains("callee-wf")); - } - - /** - * Same as {@link #asyncResponseIncludesSignalResponseLinks} but the handler returns sync. Guards - * against the sync and async builders drifting (both must call {@code - * addAllLinks(responseLinks)}). - */ - @Test - public void syncResponseIncludesSignalResponseLinks() throws TimeoutException { - WorkflowClient client = mock(WorkflowClient.class); - NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl( - client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); - nexusTaskHandlerImpl.registerNexusServiceImplementations( - new Object[] {new ResponseLinkStashingSyncServiceImpl()}); - nexusTaskHandlerImpl.start(); - - PollNexusTaskQueueResponse.Builder task = - PollNexusTaskQueueResponse.newBuilder() - .setRequest( - Request.newBuilder() - .setStartOperation( - StartOperationRequest.newBuilder() - .setOperation("operation") - .setService("TestNexusService1") - .setPayload(dataConverter.toPayload("input").get()) - .build())); - - NexusTaskHandler.Result result = - nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); - - Assert.assertNull(result.getHandlerException()); - StartOperationResponse.Sync sync = result.getResponse().getStartOperation().getSyncSuccess(); - Assert.assertEquals( - "expected one signal response link on the sync response", 1, sync.getLinksCount()); - Assert.assertTrue( - "expected response link URL to reference the callee workflow, got: " - + sync.getLinks(0).getUrl(), - sync.getLinks(0).getUrl().contains("callee-wf")); - } - - /** - * Failure path: a handler that stashes a response link (as a successful signal RPC would) and - * then throws afterwards must NOT leak the captured response link onto the failure response. - * Response links are only drained on the success branch of {@link - * NexusTaskHandlerImpl#handleStartOperation}; the failure branch builds a {@code - * StartOperationResponse.failure} that carries no links. - */ - @Test - public void failureResponseDropsCapturedResponseLinks() throws TimeoutException { - WorkflowClient client = mock(WorkflowClient.class); - NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl( - client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); - nexusTaskHandlerImpl.registerNexusServiceImplementations( - new Object[] {new ResponseLinkStashingThenThrowingServiceImpl()}); - nexusTaskHandlerImpl.start(); - - PollNexusTaskQueueResponse.Builder task = - PollNexusTaskQueueResponse.newBuilder() - .setRequest( - Request.newBuilder() - .setStartOperation( - StartOperationRequest.newBuilder() - .setOperation("operation") - .setService("TestNexusService1") - .setPayload(dataConverter.toPayload("input").get()) - .build())); - - NexusTaskHandler.Result result = - nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); - - Assert.assertNull(result.getHandlerException()); - StartOperationResponse response = result.getResponse().getStartOperation(); - Assert.assertEquals( - "expected the failure response variant", - StartOperationResponse.VariantCase.FAILURE, - response.getVariantCase()); - // The handler captured a response link before throwing; the failure response must not carry it - // (and has no links field at all). - Assert.assertFalse( - "failure response variant should not expose any success-path links", - response.hasSyncSuccess() || response.hasAsyncSuccess()); - } - - /** - * Handler that simulates what a real Nexus operation would do after issuing a signal: stash a - * response link on the operation context, then return an async result. Lets us exercise the - * async-response link merge in {@link NexusTaskHandlerImpl} without standing up a real signal - * RPC. - */ - @ServiceImpl(service = TestNexusServices.TestNexusService1.class) - public class ResponseLinkStashingAsyncServiceImpl { - @OperationImpl - public OperationHandler operation() { - return new OperationHandler() { - @Override - public OperationStartResult start( - OperationContext ctx, OperationStartDetails details, @Nullable String token) { - Link responseLink = - Link.newBuilder() - .setWorkflowEvent( - Link.WorkflowEvent.newBuilder() - .setNamespace(NAMESPACE) - .setWorkflowId("callee-wf") - .setRunId("callee-run-id") - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder() - .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED))) - .build(); - CurrentNexusOperationContext.get().addResponseLink(responseLink); - return OperationStartResult.async(token); - } - - @Override - public void cancel(OperationContext ctx, OperationCancelDetails details) {} - }; - } - } - - /** Sync mirror of {@link ResponseLinkStashingAsyncServiceImpl}. */ - @ServiceImpl(service = TestNexusServices.TestNexusService1.class) - public class ResponseLinkStashingSyncServiceImpl { - @OperationImpl - public OperationHandler operation() { - return OperationHandler.sync( - (ctx, details, input) -> { - Link responseLink = - Link.newBuilder() - .setWorkflowEvent( - Link.WorkflowEvent.newBuilder() - .setNamespace(NAMESPACE) - .setWorkflowId("callee-wf") - .setRunId("callee-run-id") - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder() - .setEventType( - EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED))) - .build(); - CurrentNexusOperationContext.get().addResponseLink(responseLink); - return "result"; - }); - } - } - - /** - * Stashes a response link on the operation context (as a successful signal RPC would) and then - * throws an {@link OperationException}, exercising the failure branch of {@link - * NexusTaskHandlerImpl#handleStartOperation}. - */ - @ServiceImpl(service = TestNexusServices.TestNexusService1.class) - public class ResponseLinkStashingThenThrowingServiceImpl { - @OperationImpl - public OperationHandler operation() { - return OperationHandler.sync( - (ctx, details, input) -> { - Link responseLink = - Link.newBuilder() - .setWorkflowEvent( - Link.WorkflowEvent.newBuilder() - .setNamespace(NAMESPACE) - .setWorkflowId("callee-wf") - .setRunId("callee-run-id") - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder() - .setEventType( - EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED))) - .build(); - CurrentNexusOperationContext.get().addResponseLink(responseLink); - throw OperationException.failed("boom after capturing a response link"); - }); - } - } - @ServiceImpl(service = TestNexusServices.TestNexusService1.class) public class TestNexusServiceImpl { @OperationImpl diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java deleted file mode 100644 index afb91f72d2..0000000000 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java +++ /dev/null @@ -1,423 +0,0 @@ -package io.temporal.workflow.nexus; - -import static org.junit.Assume.assumeTrue; - -import io.nexusrpc.handler.OperationCancelDetails; -import io.nexusrpc.handler.OperationContext; -import io.nexusrpc.handler.OperationHandler; -import io.nexusrpc.handler.OperationImpl; -import io.nexusrpc.handler.OperationStartDetails; -import io.nexusrpc.handler.OperationStartResult; -import io.nexusrpc.handler.ServiceImpl; -import io.temporal.api.enums.v1.EventType; -import io.temporal.api.history.v1.History; -import io.temporal.api.history.v1.HistoryEvent; -import io.temporal.client.BatchRequest; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowOptions; -import io.temporal.client.WorkflowStub; -import io.temporal.nexus.Nexus; -import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.workflow.NexusOperationHandle; -import io.temporal.workflow.NexusOperationOptions; -import io.temporal.workflow.NexusServiceOptions; -import io.temporal.workflow.SignalMethod; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; -import io.temporal.workflow.shared.TestNexusServices; -import io.temporal.workflow.shared.TestWorkflows; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import javax.annotation.Nullable; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Verifies link propagation in both directions when a Nexus operation handler interacts with a - * workflow via signal. Covers three scenarios: - * - *
    - *
  • {@link #testSignalOperationLinks()} — sync handler, two signals (signalWithStart + plain - * signal). - *
  • {@link #testMultiSignalOperationLinks()} — one Nexus operation signals three different - * callees; verifies all three response links land on the caller's single {@code - * NexusOperationCompleted} event. - *
  • {@link #testAsyncSignalOperationLinks()} — handler returns an async result after signaling; - * verifies the response link lands on {@code NexusOperationStarted} (the async response path - * in {@link io.temporal.internal.nexus.NexusTaskHandlerImpl}). - *
- * - *

All tests require Temporal server ≥ 1.31 with {@code EnableCHASMSignalBacklinks=true}; the - * in-memory test server does not implement this path so the class is skipped unless a real server - * is in use. - */ -public class SignalOperationLinkingTest { - - private static final String MODE_SIGNAL_WITH_START = "signalWithStart"; - private static final String MODE_SIGNAL = "signal"; - private static final String MODE_MULTI_SIGNAL_WITH_START = "multi"; - private static final String MODE_ASYNC_SIGNAL_WITH_START = "asyncSignalWithStart"; - - @Rule - public SDKTestWorkflowRule testWorkflowRule = - SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(SignalCallerWorkflow.class, SignalCalleeWorkflowImpl.class) - .setNexusServiceImplementation(new SignalingNexusServiceImpl()) - .build(); - - @BeforeClass - public static void requireExternalService() { - // The server-side response link implementation (temporalio/temporal#9897) is gated by - // EnableCHASMSignalBacklinks and is only present in real servers. - assumeTrue( - "signal response links require a real server with EnableCHASMSignalBacklinks=true", - SDKTestWorkflowRule.useExternalService); - } - - // ── Tests ──────────────────────────────────────────────────────────────────────────────── - - @Test - public void testSignalOperationLinks() { - runTwoSignalScenario(); - } - - /** - * One Nexus operation signals three different callees. The handler's three signal-class RPCs each - * contribute a response link and all three end up on the caller's single {@code - * NexusOperationCompleted} event. - */ - @Test - public void testMultiSignalOperationLinks() { - WorkflowClient client = testWorkflowRule.getWorkflowClient(); - List calleeIds = Arrays.asList("multicallee-a", "multicallee-b", "multicallee-c"); - - TestWorkflows.TestWorkflow1 callerStub = - testWorkflowRule.newWorkflowStubTimeoutOptions( - TestWorkflows.TestWorkflow1.class, "multicaller"); - String result = - callerStub.execute(MODE_MULTI_SIGNAL_WITH_START + ":" + String.join(",", calleeIds)); - Assert.assertEquals("ok:multi:" + String.join(",", calleeIds), result); - - // Each callee gets one signal and completes. - for (String calleeId : calleeIds) { - String calleeResult = client.newUntypedWorkflowStub(calleeId).getResult(String.class); - Assert.assertEquals("multi-signal", calleeResult); - } - - String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); - History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); - - // Caller → each callee: forward links on every callee's WorkflowExecutionSignaled event. - for (String calleeId : calleeIds) { - History calleeHistory = client.fetchHistory(calleeId).getHistory(); - assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 1); - } - - // Callee → caller: the single NexusOperationCompleted carries one response link per callee. - List completedEvents = - getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); - Assert.assertEquals( - "expected exactly one NexusOperationCompleted event", 1, completedEvents.size()); - HistoryEvent completed = completedEvents.get(0); - Assert.assertEquals( - "expected one response link per signaled callee", - calleeIds.size(), - completed.getLinksCount()); - List responseLinkWorkflowIds = new ArrayList<>(); - for (int i = 0; i < completed.getLinksCount(); i++) { - io.temporal.api.common.v1.Link.WorkflowEvent responseLink = - completed.getLinks(i).getWorkflowEvent(); - responseLinkWorkflowIds.add(responseLink.getWorkflowId()); - EventType responseLinkEventType = - responseLink.hasRequestIdRef() - ? responseLink.getRequestIdRef().getEventType() - : responseLink.getEventRef().getEventType(); - Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, responseLinkEventType); - } - Assert.assertTrue( - "expected response links to reference all three callees: " + responseLinkWorkflowIds, - responseLinkWorkflowIds.containsAll(calleeIds)); - } - - /** - * Async response path: handler signals the callee then returns an async result. Verifies that the - * response link lands on {@code NexusOperationStarted} (the async branch in - * NexusTaskHandlerImpl). - */ - @Test - public void testAsyncSignalOperationLinks() { - WorkflowClient client = testWorkflowRule.getWorkflowClient(); - String calleeWorkflowId = "async-callee"; - - TestWorkflows.TestWorkflow1 callerStub = - testWorkflowRule.newWorkflowStubTimeoutOptions( - TestWorkflows.TestWorkflow1.class, "async-caller"); - String result = callerStub.execute(MODE_ASYNC_SIGNAL_WITH_START + ":" + calleeWorkflowId); - Assert.assertEquals("async-started", result); - - String calleeResult = client.newUntypedWorkflowStub(calleeWorkflowId).getResult(String.class); - Assert.assertEquals("async-signal", calleeResult); - - String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); - History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); - History calleeHistory = client.fetchHistory(calleeWorkflowId).getHistory(); - - assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 1); - - // Backward direction lands on NexusOperationStarted for the async response path. - List startedEvents = - getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); - Assert.assertEquals( - "expected exactly one NexusOperationStarted event for the async op", - 1, - startedEvents.size()); - assertResponseLink(startedEvents.get(0), calleeWorkflowId); - } - - // ── Shared scenario + assertion helpers ────────────────────────────────────────────────── - - /** Drive the two-signal flow (signalWithStart + plain signal) and assert link propagation. */ - private void runTwoSignalScenario() { - WorkflowClient client = testWorkflowRule.getWorkflowClient(); - String calleeWorkflowId = "callee"; - - TestWorkflows.TestWorkflow1 callerStub = - testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); - String result = callerStub.execute("twoSync:" + calleeWorkflowId); - Assert.assertEquals("ok:signalWithStart|ok:signal", result); - - String calleeResult = client.newUntypedWorkflowStub(calleeWorkflowId).getResult(String.class); - Assert.assertEquals("first,second", calleeResult); - - String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); - History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); - History calleeHistory = client.fetchHistory(calleeWorkflowId).getHistory(); - - assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 2); - - List completedEvents = - getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); - Assert.assertEquals( - "expected two NexusOperationCompleted events on the caller", 2, completedEvents.size()); - for (HistoryEvent completed : completedEvents) { - assertResponseLink(completed, calleeWorkflowId); - } - } - - /** - * Assert that the callee history has {@code expectedCount} {@code WorkflowExecutionSignaled} - * events, each linked back to the caller's {@code NexusOperationScheduled} event. - */ - private static void assertForwardLinks( - History calleeHistory, String callerWorkflowId, int expectedCount) { - List signaledEvents = - getAllEventsOfType(calleeHistory, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); - Assert.assertEquals( - "expected " + expectedCount + " WorkflowExecutionSignaled events on the callee", - expectedCount, - signaledEvents.size()); - for (HistoryEvent signaled : signaledEvents) { - Assert.assertTrue( - "expected at least one link on each WorkflowExecutionSignaled event", - signaled.getLinksCount() >= 1); - Assert.assertEquals( - "signaled-event link should reference the caller workflow", - callerWorkflowId, - signaled.getLinks(0).getWorkflowEvent().getWorkflowId()); - Assert.assertEquals( - EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, - signaled.getLinks(0).getWorkflowEvent().getEventRef().getEventType()); - } - } - - /** - * Assert that a single caller-side event ({@code NexusOperationCompleted} or {@code - * NexusOperationStarted}) carries a response link to the callee's {@code - * WorkflowExecutionSignaled} event. Server PR #9897 keys these via {@code RequestIdReference} - * rather than {@code EventReference}, so we accept either oneof variant. - */ - private static void assertResponseLink(HistoryEvent event, String calleeWorkflowId) { - Assert.assertTrue( - "expected a signal-event response link on " + event.getEventType().name(), - event.getLinksCount() >= 1); - io.temporal.api.common.v1.Link.WorkflowEvent responseLink = - event.getLinks(0).getWorkflowEvent(); - Assert.assertEquals(calleeWorkflowId, responseLink.getWorkflowId()); - EventType responseLinkEventType = - responseLink.hasRequestIdRef() - ? responseLink.getRequestIdRef().getEventType() - : responseLink.getEventRef().getEventType(); - Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, responseLinkEventType); - } - - /** Find all history events of a given type, in order. */ - private static List getAllEventsOfType(History history, EventType type) { - List out = new ArrayList<>(); - for (HistoryEvent e : history.getEventsList()) { - if (e.getEventType() == type) { - out.add(e); - } - } - return out; - } - - // ── Workflows ──────────────────────────────────────────────────────────────────────────── - - /** - * Caller workflow. Branches on a mode prefix in the input: - * - *

    - *
  • {@code twoSync:} — invoke the nexus op twice synchronously (signalWithStart, - * then signal). - *
  • {@code multi:,,} — invoke the nexus op once synchronously; handler - * signalWithStart's each id. - *
  • {@code asyncSignalWithStart:} — invoke the nexus op asynchronously via {@code - * Workflow.startNexusOperation}; wait for execution start and return without waiting for - * the operation result. - *
- */ - public static class SignalCallerWorkflow implements TestWorkflows.TestWorkflow1 { - @Override - public String execute(String input) { - String[] parts = input.split(":", 2); - String mode = parts[0]; - String rest = parts[1]; - - TestNexusServices.TestNexusService1 stub = - Workflow.newNexusServiceStub( - TestNexusServices.TestNexusService1.class, - NexusServiceOptions.newBuilder() - .setOperationOptions( - NexusOperationOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofSeconds(30)) - .build()) - .build()); - - switch (mode) { - case "twoSync": - { - String r1 = stub.operation(MODE_SIGNAL_WITH_START + ":" + rest); - String r2 = stub.operation(MODE_SIGNAL + ":" + rest); - return r1 + "|" + r2; - } - case MODE_MULTI_SIGNAL_WITH_START: - return stub.operation(MODE_MULTI_SIGNAL_WITH_START + ":" + rest); - case MODE_ASYNC_SIGNAL_WITH_START: - { - NexusOperationHandle h = - Workflow.startNexusOperation( - stub::operation, MODE_ASYNC_SIGNAL_WITH_START + ":" + rest); - // Wait for the async op to be Started (the event that carries the response link) but - // not for its eventual result — the async op completes outside this workflow. - h.getExecution().get(); - return "async-started"; - } - default: - throw new IllegalArgumentException("unknown mode: " + mode); - } - } - } - - /** Callee workflow. Awaits {@code expectedSignals} signals then returns their joined payloads. */ - @WorkflowInterface - public interface SignalCalleeWorkflow { - @WorkflowMethod - String execute(int expectedSignals); - - @SignalMethod - void ping(String msg); - } - - public static class SignalCalleeWorkflowImpl implements SignalCalleeWorkflow { - private final List received = new ArrayList<>(); - - @Override - public String execute(int expectedSignals) { - Workflow.await(() -> received.size() >= expectedSignals); - return String.join(",", received); - } - - @Override - public void ping(String msg) { - received.add(msg); - } - } - - // ── Nexus service ──────────────────────────────────────────────────────────────────────── - - /** - * Single Nexus operation that dispatches based on a mode prefix in its input. Supports sync and - * async return shapes. - */ - @ServiceImpl(service = TestNexusServices.TestNexusService1.class) - public static class SignalingNexusServiceImpl { - - @OperationImpl - public OperationHandler operation() { - return new OperationHandler() { - @Override - public OperationStartResult start( - OperationContext ctx, OperationStartDetails details, @Nullable String input) { - String[] parts = input.split(":", 2); - String mode = parts[0]; - String rest = parts[1]; - - io.temporal.nexus.NexusOperationContext opCtx = Nexus.getOperationContext(); - WorkflowClient client = opCtx.getWorkflowClient(); - String taskQueue = opCtx.getInfo().getTaskQueue(); - - switch (mode) { - case MODE_SIGNAL_WITH_START: - signalWithStart(client, rest, taskQueue, /* expectedSignals= */ 2, "first"); - return OperationStartResult.sync("ok:" + MODE_SIGNAL_WITH_START); - case MODE_SIGNAL: - client.newWorkflowStub(SignalCalleeWorkflow.class, rest).ping("second"); - return OperationStartResult.sync("ok:" + MODE_SIGNAL); - case MODE_MULTI_SIGNAL_WITH_START: - for (String id : rest.split(",")) { - signalWithStart(client, id, taskQueue, /* expectedSignals= */ 1, "multi-signal"); - } - return OperationStartResult.sync("ok:multi:" + rest); - case MODE_ASYNC_SIGNAL_WITH_START: - signalWithStart(client, rest, taskQueue, /* expectedSignals= */ 1, "async-signal"); - // Async branch in NexusTaskHandlerImpl. The caller never waits for completion, so - // the token is opaque. - return OperationStartResult.async("async-op-" + UUID.randomUUID()); - default: - throw new IllegalArgumentException("unknown mode: " + mode); - } - } - - @Override - public void cancel(OperationContext ctx, OperationCancelDetails details) { - // Not exercised in these tests. - } - }; - } - - private static void signalWithStart( - WorkflowClient client, - String calleeWorkflowId, - String taskQueue, - int expectedSignals, - String signalPayload) { - SignalCalleeWorkflow startStub = - client.newWorkflowStub( - SignalCalleeWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(calleeWorkflowId) - .setTaskQueue(taskQueue) - .build()); - BatchRequest batch = client.newSignalWithStartRequest(); - batch.add(startStub::execute, expectedSignals); - batch.add(startStub::ping, signalPayload); - client.signalWithStart(batch); - } - } -} diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/DescribeWorkflowAsserter.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/DescribeWorkflowAsserter.java index 2af5675cec..90e36fa451 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/DescribeWorkflowAsserter.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/DescribeWorkflowAsserter.java @@ -189,20 +189,11 @@ public DescribeWorkflowAsserter assertPendingChildrenCount(int expected) { return this; } - /** - * Assert that every expected request-id info is present and matches. Extra entries on the actual - * map are tolerated: a real server records request IDs that the in-memory test server does not - * (for example, the request ID of a signal RPC), so callers assert only the entries they control. - */ public DescribeWorkflowAsserter assertRequestIdInfos(Map expected) { - Map actualInfos = - actual.getWorkflowExtendedInfo().getRequestIdInfosMap(); - expected.forEach( - (requestId, info) -> - Assert.assertEquals( - "request id info for " + requestId + " should match", - info, - actualInfos.get(requestId))); + Assert.assertEquals( + "request id infos should match", + expected, + actual.getWorkflowExtendedInfo().getRequestIdInfosMap()); return this; } }