From 63980a843379b5297e1cc46d1de6615fd0bcb0a5 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 12 Mar 2026 10:03:43 -0700 Subject: [PATCH 1/3] Fix time skipping for dotnet SDK --- .../testservice/TestWorkflowMutableState.java | 2 + .../TestWorkflowMutableStateImpl.java | 86 ++++- .../testservice/TestWorkflowService.java | 7 +- .../reachability-metadata.json | 343 +++--------------- .../functional/NexusWorkflowTest.java | 257 +++++++++++++ 5 files changed, 390 insertions(+), 305 deletions(-) diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index a1fab0a44..41c994727 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -107,6 +107,8 @@ void startNexusOperation( void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref); + void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure); + void completeNexusOperation(NexusOperationRef ref, Payload result); void completeAsyncNexusOperation( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 9e14382a9..1bf67ba8c 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -105,6 +105,10 @@ private interface UpdateProcedure { private final Map activityById = new HashMap<>(); private final Map> childWorkflows = new HashMap<>(); private final Map> nexusOperations = new HashMap<>(); + // Tracks cancelRequestedEventId by scheduledEventId, persists after operation removal. + private final Map nexusCancelRequestedEventIds = new HashMap<>(); + // Tracks scheduledEventIds of nexus cancel requests that have not yet received a response. + private final Set unresolvedNexusCancelRequests = new HashSet<>(); private final Map> timers = new HashMap<>(); private final Map> externalSignals = new HashMap<>(); private final Map> externalCancellations = @@ -486,10 +490,13 @@ public void completeWorkflowTask( .asRuntimeException(); } - if (unhandledCommand(request) || unhandledMessages(request)) { + if (unhandledCommand(request) + || unhandledMessages(request) + || hasUnresolvedNexusCancelWithCompletion(request)) { // Fail the workflow task if there are new events or messages and a command tries to - // complete the workflow. Record the failure in history, then throw an error to the - // caller (matching real server behavior). + // complete the workflow, or if there are unresolved nexus cancel requests. Record the + // failure in history, then throw an error to the caller (matching real server + // behavior). failWorkflowTaskWithAReason( WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, null, @@ -673,6 +680,12 @@ private boolean unhandledMessages(RespondWorkflowTaskCompletedRequest request) { && hasCompletionCommand(request.getCommandsList())); } + private boolean hasUnresolvedNexusCancelWithCompletion( + RespondWorkflowTaskCompletedRequest request) { + return !unresolvedNexusCancelRequests.isEmpty() + && hasCompletionCommand(request.getCommandsList()); + } + private boolean hasCompletionCommand(List commands) { for (Command command : commands) { if (WorkflowExecutionUtils.isWorkflowExecutionCompleteCommand(command)) { @@ -899,6 +912,12 @@ private void processRequestCancelNexusOperation( ctx.setNeedWorkflowTask(true); } else { operation.action(Action.REQUEST_CANCELLATION, ctx, null, workflowTaskCompletedId); + ctx.onCommit( + historySize -> { + nexusCancelRequestedEventIds.put( + scheduleEventId, operation.getData().cancelRequestedEventId); + unresolvedNexusCancelRequests.add(scheduleEventId); + }); ctx.addTimer( ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout), () -> @@ -2339,6 +2358,10 @@ public void startNexusOperation( update( ctx -> { StateMachine operation = getPendingNexusOperation(scheduledEventId); + if (operation.getState() == State.STARTED) { + // Operation was already started (e.g. from a previous attempt before retry). + return; + } operation.action(StateMachines.Action.START, ctx, resp, 0); operation.getData().identity = clientIdentity; @@ -2378,13 +2401,30 @@ public void cancelNexusOperation(NexusOperationRef ref, Failure failure) { }); } + /** + * Resolves the cancelRequestedEventId for a nexus operation, checking both the active operations + * map and the persisted cancel request IDs (for operations that have already completed/removed). + */ + private long resolveCancelRequestedEventId(long scheduledEventId) { + StateMachine operation = nexusOperations.get(scheduledEventId); + if (operation != null) { + return operation.getData().cancelRequestedEventId; + } + Long stored = nexusCancelRequestedEventIds.get(scheduledEventId); + return stored != null ? stored : 0; + } + @Override public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) { update( ctx -> { StateMachine operation = - getPendingNexusOperation(ref.getScheduledEventId()); - if (!operationInFlight(operation.getState())) { + nexusOperations.get(ref.getScheduledEventId()); + if (operation != null && !operationInFlight(operation.getState())) { + return; + } + long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId()); + if (cancelRequestedEventId == 0) { return; } ctx.addEvent( @@ -2393,12 +2433,39 @@ public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) { .setNexusOperationCancelRequestCompletedEventAttributes( NexusOperationCancelRequestCompletedEventAttributes.newBuilder() .setScheduledEventId(ref.getScheduledEventId()) - .setRequestedEventId(operation.getData().cancelRequestedEventId)) + .setRequestedEventId(cancelRequestedEventId)) .build()); + ctx.onCommit( + historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId())); + scheduleWorkflowTask(ctx); ctx.unlockTimer("cancelNexusOperationRequestAcknowledge"); }); } + @Override + public void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure) { + update( + ctx -> { + long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId()); + if (cancelRequestedEventId == 0) { + return; + } + ctx.addEvent( + HistoryEvent.newBuilder() + .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED) + .setNexusOperationCancelRequestFailedEventAttributes( + NexusOperationCancelRequestFailedEventAttributes.newBuilder() + .setScheduledEventId(ref.getScheduledEventId()) + .setRequestedEventId(cancelRequestedEventId) + .setFailure(failure)) + .build()); + ctx.onCommit( + historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId())); + scheduleWorkflowTask(ctx); + ctx.unlockTimer("failNexusOperationCancelRequest"); + }); + } + @Override public void completeNexusOperation(NexusOperationRef ref, Payload result) { update( @@ -2471,6 +2538,8 @@ private void timeoutNexusOperation( } operation.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0); nexusOperations.remove(scheduledEventId); + // The cancel response won't matter after a timeout, so clear the unresolved cancel. + unresolvedNexusCancelRequests.remove(scheduledEventId); scheduleWorkflowTask(ctx); }); } catch (StatusRuntimeException e) { @@ -2496,7 +2565,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in ctx -> { StateMachine operation = getPendingNexusOperation(scheduledEventId); if (attempt != operation.getData().getAttempt() - || isTerminalState(operation.getState())) { + || isTerminalState(operation.getState()) + || operation.getState() == State.STARTED) { throw Status.NOT_FOUND.withDescription("Timer fired earlier").asRuntimeException(); } @@ -2510,6 +2580,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in if (isTerminalState(operation.getState())) { nexusOperations.remove(scheduledEventId); + // Cancel response won't arrive after terminal state, unblock workflow completion. + unresolvedNexusCancelRequests.remove(scheduledEventId); scheduleWorkflowTask(ctx); } else { retryNexusTask(ctx, operation); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index b4c16d1c6..7cf920266 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1119,7 +1119,12 @@ public void respondNexusTaskFailed( NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken()); TestWorkflowMutableState mutableState = getMutableState(tt.getOperationRef().getExecutionId()); - if (mutableState.validateOperationTaskToken(tt)) { + Failure failure = handlerErrorToFailure(request.getError()); + if (tt.isCancel()) { + // For cancel failures, the operation may already be completed/removed, + // so skip token validation and record the event directly. + mutableState.failNexusOperationCancelRequest(tt.getOperationRef(), failure); + } else if (mutableState.validateOperationTaskToken(tt)) { mutableState.failNexusOperation(tt.getOperationRef(), failure); } responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance()); diff --git a/temporal-test-server/src/main/resources/META-INF/native-image/io.temporal/temporal-test-server/reachability-metadata.json b/temporal-test-server/src/main/resources/META-INF/native-image/io.temporal/temporal-test-server/reachability-metadata.json index 61662ee4b..5a618cfa9 100644 --- a/temporal-test-server/src/main/resources/META-INF/native-image/io.temporal/temporal-test-server/reachability-metadata.json +++ b/temporal-test-server/src/main/resources/META-INF/native-image/io.temporal/temporal-test-server/reachability-metadata.json @@ -1028,31 +1028,19 @@ }, { "type": "io.temporal.api.common.v1.Payload", - "methods": [ - { - "name": "getData", - "parameterTypes": [] - }, - { - "name": "getDefaultInstance", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.common.v1.Payload$Builder" + "type": "io.temporal.api.common.v1.Payload$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.common.v1.Payloads", - "methods": [ - { - "name": "getPayloadsList", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.common.v1.Payloads$Builder" + "type": "io.temporal.api.common.v1.Payloads$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.common.v1.Priority" @@ -1213,330 +1201,91 @@ }, { "type": "io.temporal.api.failure.v1.ActivityFailureInfo", - "methods": [ - { - "name": "getActivityId", - "parameterTypes": [] - }, - { - "name": "getActivityType", - "parameterTypes": [] - }, - { - "name": "getIdentity", - "parameterTypes": [] - }, - { - "name": "getRetryStateValue", - "parameterTypes": [] - }, - { - "name": "getScheduledEventId", - "parameterTypes": [] - }, - { - "name": "getStartedEventId", - "parameterTypes": [] - }, - { - "name": "hasActivityType", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.ActivityFailureInfo$Builder" + "type": "io.temporal.api.failure.v1.ActivityFailureInfo$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.ApplicationFailureInfo", - "methods": [ - { - "name": "getCategoryValue", - "parameterTypes": [] - }, - { - "name": "getNonRetryable", - "parameterTypes": [] - }, - { - "name": "getType", - "parameterTypes": [] - }, - { - "name": "hasDetails", - "parameterTypes": [] - }, - { - "name": "hasNextRetryDelay", - "parameterTypes": [] - }, - { - "name": "newBuilder", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.ApplicationFailureInfo$Builder", - "methods": [ - { - "name": "getNonRetryable", - "parameterTypes": [] - }, - { - "name": "getType", - "parameterTypes": [] - }, - { - "name": "setNonRetryable", - "parameterTypes": [ - "boolean" - ] - }, - { - "name": "setType", - "parameterTypes": [ - "java.lang.String" - ] - } - ] + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.CanceledFailureInfo", - "methods": [ - { - "name": "getDetails", - "parameterTypes": [] - }, - { - "name": "hasDetails", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.CanceledFailureInfo$Builder" + "type": "io.temporal.api.failure.v1.CanceledFailureInfo$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.ChildWorkflowExecutionFailureInfo", - "methods": [ - { - "name": "getInitiatedEventId", - "parameterTypes": [] - }, - { - "name": "getNamespace", - "parameterTypes": [] - }, - { - "name": "getRetryStateValue", - "parameterTypes": [] - }, - { - "name": "getStartedEventId", - "parameterTypes": [] - }, - { - "name": "getWorkflowExecution", - "parameterTypes": [] - }, - { - "name": "getWorkflowType", - "parameterTypes": [] - }, - { - "name": "hasWorkflowExecution", - "parameterTypes": [] - }, - { - "name": "hasWorkflowType", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.ChildWorkflowExecutionFailureInfo$Builder" + "type": "io.temporal.api.failure.v1.ChildWorkflowExecutionFailureInfo$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.Failure", - "methods": [ - { - "name": "getActivityFailureInfo", - "parameterTypes": [] - }, - { - "name": "getApplicationFailureInfo", - "parameterTypes": [] - }, - { - "name": "getCanceledFailureInfo", - "parameterTypes": [] - }, - { - "name": "getCause", - "parameterTypes": [] - }, - { - "name": "getChildWorkflowExecutionFailureInfo", - "parameterTypes": [] - }, - { - "name": "getFailureInfoCase", - "parameterTypes": [] - }, - { - "name": "getMessage", - "parameterTypes": [] - }, - { - "name": "getNexusHandlerFailureInfo", - "parameterTypes": [] - }, - { - "name": "getNexusOperationExecutionFailureInfo", - "parameterTypes": [] - }, - { - "name": "getSource", - "parameterTypes": [] - }, - { - "name": "getStackTrace", - "parameterTypes": [] - }, - { - "name": "getTerminatedFailureInfo", - "parameterTypes": [] - }, - { - "name": "getTimeoutFailureInfo", - "parameterTypes": [] - }, - { - "name": "hasCause", - "parameterTypes": [] - }, - { - "name": "hasEncodedAttributes", - "parameterTypes": [] - }, - { - "name": "newBuilder", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.Failure$Builder", - "methods": [ - { - "name": "getFailureInfoCase", - "parameterTypes": [] - }, - { - "name": "getSource", - "parameterTypes": [] - }, - { - "name": "getStackTrace", - "parameterTypes": [] - }, - { - "name": "setApplicationFailureInfo", - "parameterTypes": [ - "io.temporal.api.failure.v1.ApplicationFailureInfo" - ] - }, - { - "name": "setSource", - "parameterTypes": [ - "java.lang.String" - ] - }, - { - "name": "setStackTrace", - "parameterTypes": [ - "java.lang.String" - ] - } - ] + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.NexusHandlerFailureInfo", - "methods": [ - { - "name": "getRetryBehaviorValue", - "parameterTypes": [] - }, - { - "name": "getType", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.NexusHandlerFailureInfo$Builder" + "type": "io.temporal.api.failure.v1.NexusHandlerFailureInfo$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.NexusOperationFailureInfo", - "methods": [ - { - "name": "getEndpoint", - "parameterTypes": [] - }, - { - "name": "getOperation", - "parameterTypes": [] - }, - { - "name": "getOperationId", - "parameterTypes": [] - }, - { - "name": "getOperationToken", - "parameterTypes": [] - }, - { - "name": "getScheduledEventId", - "parameterTypes": [] - }, - { - "name": "getService", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true + }, + { + "type": "io.temporal.api.failure.v1.NexusOperationFailureInfo$Builder", + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.NexusOperationFailureInfo$Builder" + "type": "io.temporal.api.failure.v1.ResetWorkflowFailureInfo", + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.ResetWorkflowFailureInfo" + "type": "io.temporal.api.failure.v1.ResetWorkflowFailureInfo$Builder", + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.ServerFailureInfo" + "type": "io.temporal.api.failure.v1.ServerFailureInfo", + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.TerminatedFailureInfo" + "type": "io.temporal.api.failure.v1.ServerFailureInfo$Builder", + "allDeclaredMethods": true + }, + { + "type": "io.temporal.api.failure.v1.TerminatedFailureInfo", + "allDeclaredMethods": true + }, + { + "type": "io.temporal.api.failure.v1.TerminatedFailureInfo$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.failure.v1.TimeoutFailureInfo", - "methods": [ - { - "name": "getLastHeartbeatDetails", - "parameterTypes": [] - }, - { - "name": "getTimeoutTypeValue", - "parameterTypes": [] - }, - { - "name": "hasLastHeartbeatDetails", - "parameterTypes": [] - } - ] + "allDeclaredMethods": true }, { - "type": "io.temporal.api.failure.v1.TimeoutFailureInfo$Builder" + "type": "io.temporal.api.failure.v1.TimeoutFailureInfo$Builder", + "allDeclaredMethods": true }, { "type": "io.temporal.api.history.v1.ActivityPropertiesModifiedExternallyEventAttributes" diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 77d78cba9..bbb903cc0 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -955,6 +955,263 @@ public void testNexusOperationHandlerError() { } } + @Test + public void testNexusOperationHandlerErrorMessagePreserved() { + // Verifies that handler error failure message is preserved in the NexusHandlerFailureInfo + // wrapper (not just set on the cause). + CompletableFuture nexusPoller = + pollNexusTask() + .thenCompose( + task -> + failNexusTask( + task.getTaskToken(), + HandlerError.newBuilder() + .setErrorType("BAD_REQUEST") + .setFailure( + Failure.newBuilder().setMessage("specific handler error message")) + .build())); + + try { + WorkflowStub stub = newWorkflowStub("TestNexusOperationHandlerErrorMessageWorkflow"); + WorkflowExecution execution = stub.start(); + + PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); + completeWorkflowTask(pollResp.getTaskToken(), newScheduleOperationCommand()); + + nexusPoller.get(); + + pollResp = pollWorkflowTask(); + completeWorkflow(pollResp.getTaskToken()); + + List events = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED); + Assert.assertEquals(1, events.size()); + io.temporal.api.failure.v1.Failure failure = + events.get(0).getNexusOperationFailedEventAttributes().getFailure(); + // The handler failure wrapper should have the handler error message set + io.temporal.api.failure.v1.Failure cause = failure.getCause(); + Assert.assertTrue(cause.hasNexusHandlerFailureInfo()); + Assert.assertEquals("specific handler error message", cause.getMessage()); + // The cause of the handler failure should also have the message + Assert.assertEquals("specific handler error message", cause.getCause().getMessage()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + nexusPoller.cancel(true); + } + } + + @Test(timeout = 30000) + public void testNexusOperationCancelRequestFailed() { + assumeTrue( + "Skipping for real server since this test manipulates cancel task tokens", + !testWorkflowRule.isUseExternalService()); + // Verifies that when a cancel handler fails, a NEXUS_OPERATION_CANCEL_REQUEST_FAILED event + // is recorded in history. + String operationId = UUID.randomUUID().toString(); + CompletableFuture nexusPoller = + pollNexusTask().thenCompose(task -> completeNexusTask(task, operationId)); + + try { + WorkflowStub stub = newWorkflowStub("TestNexusOperationCancelRequestFailedWorkflow"); + WorkflowExecution execution = stub.start(); + + // Schedule the nexus operation + PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); + completeWorkflowTask(pollResp.getTaskToken(), newScheduleOperationCommand()); + + // Wait for async start to complete + nexusPoller.get(); + + // Verify started event and get scheduled event ID for cancel command + pollResp = pollWorkflowTask(); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + List scheduledEvents = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + Assert.assertEquals(1, scheduledEvents.size()); + long scheduledEventId = scheduledEvents.get(0).getEventId(); + + // Issue cancel command + Command cancelCmd = + Command.newBuilder() + .setCommandType(CommandType.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION) + .setRequestCancelNexusOperationCommandAttributes( + RequestCancelNexusOperationCommandAttributes.newBuilder() + .setScheduledEventId(scheduledEventId)) + .build(); + completeWorkflowTask(pollResp.getTaskToken(), cancelCmd); + + // Poll for the cancel nexus task and fail it + PollNexusTaskQueueResponse cancelTask = pollNexusTask().get(); + Assert.assertTrue( + "Expected cancel task", NexusTaskToken.fromBytes(cancelTask.getTaskToken()).isCancel()); + failNexusTask( + cancelTask.getTaskToken(), + HandlerError.newBuilder() + .setErrorType("INTERNAL") + .setFailure(Failure.newBuilder().setMessage("cancel handler failed")) + .build()) + .get(); + + // Verify NEXUS_OPERATION_CANCEL_REQUEST_FAILED event is recorded + pollResp = pollWorkflowTask(); + List cancelFailedEvents = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED); + Assert.assertEquals(1, cancelFailedEvents.size()); + io.temporal.api.failure.v1.Failure cancelFailure = + cancelFailedEvents + .get(0) + .getNexusOperationCancelRequestFailedEventAttributes() + .getFailure(); + Assert.assertTrue(cancelFailure.hasNexusHandlerFailureInfo()); + Assert.assertEquals("cancel handler failed", cancelFailure.getMessage()); + + completeWorkflow(pollResp.getTaskToken()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + nexusPoller.cancel(true); + } + } + + @Test(timeout = 30000) + public void testNexusOperationCancelRequestAcknowledgeSchedulesWorkflowTask() { + assumeTrue( + "Skipping for real server since this test checks cancel ack WFT scheduling", + !testWorkflowRule.isUseExternalService()); + // Verifies that when cancel is acknowledged, a workflow task is scheduled so the SDK + // can process the NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED event. + String operationId = UUID.randomUUID().toString(); + CompletableFuture nexusPoller = + pollNexusTask().thenCompose(task -> completeNexusTask(task, operationId)); + + try { + WorkflowStub stub = newWorkflowStub("TestNexusOperationCancelRequestAckSchedulesWFTWorkflow"); + WorkflowExecution execution = stub.start(); + + // Schedule the nexus operation + PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); + completeWorkflowTask(pollResp.getTaskToken(), newScheduleOperationCommand()); + + // Wait for async start to complete + nexusPoller.get(); + + // Verify started event and get scheduled event ID for cancel command + pollResp = pollWorkflowTask(); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + List scheduledEvents = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + Assert.assertEquals(1, scheduledEvents.size()); + long scheduledEventId = scheduledEvents.get(0).getEventId(); + + // Issue cancel command + Command cancelCmd = + Command.newBuilder() + .setCommandType(CommandType.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION) + .setRequestCancelNexusOperationCommandAttributes( + RequestCancelNexusOperationCommandAttributes.newBuilder() + .setScheduledEventId(scheduledEventId)) + .build(); + completeWorkflowTask(pollResp.getTaskToken(), cancelCmd); + + // Poll for the cancel nexus task and complete it (acknowledge cancel) + PollNexusTaskQueueResponse cancelTask = pollNexusTask().get(); + Assert.assertTrue( + "Expected cancel task", NexusTaskToken.fromBytes(cancelTask.getTaskToken()).isCancel()); + completeNexusTask( + cancelTask, + Response.newBuilder() + .setCancelOperation(CancelOperationResponse.getDefaultInstance()) + .build()) + .get(); + + // The key assertion: a workflow task should be scheduled after cancel ack + // so the SDK can process the CANCEL_REQUEST_COMPLETED event + pollResp = pollWorkflowTask(); + List cancelCompletedEvents = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED); + Assert.assertEquals(1, cancelCompletedEvents.size()); + + completeWorkflow(pollResp.getTaskToken()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + nexusPoller.cancel(true); + } + } + + @Test(timeout = 30000) + public void testNexusOperationCanceledErrorWithCauseChain() { + // Verifies that a canceled OperationError with a JSON-encoded failure chain properly + // unwraps the CanceledFailureInfo and preserves the cause. + DataConverter dataConverter = DefaultDataConverter.newDefaultInstance(); + io.temporal.api.failure.v1.Failure cancelCause = + io.temporal.api.failure.v1.Failure.newBuilder() + .setMessage("operation canceled") + .setCanceledFailureInfo( + io.temporal.api.failure.v1.CanceledFailureInfo.getDefaultInstance()) + .build(); + String cancelCauseJson; + try { + cancelCauseJson = com.google.protobuf.util.JsonFormat.printer().print(cancelCause); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + + Response canceledResp = + Response.newBuilder() + .setStartOperation( + StartOperationResponse.newBuilder() + .setOperationError( + UnsuccessfulOperationError.newBuilder() + .setOperationState("canceled") + .setFailure( + Failure.newBuilder() + .setMessage("operation canceled") + .putMetadata("type", "temporal.api.failure.v1.Failure") + .setDetails(ByteString.copyFromUtf8(cancelCauseJson))))) + .build(); + CompletableFuture nexusPoller = + pollNexusTask().thenCompose(task -> completeNexusTask(task, canceledResp)); + + try { + WorkflowStub stub = newWorkflowStub("TestNexusOperationCanceledErrorCauseChainWorkflow"); + WorkflowExecution execution = stub.start(); + + PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); + completeWorkflowTask(pollResp.getTaskToken(), newScheduleOperationCommand()); + + nexusPoller.get(); + + pollResp = pollWorkflowTask(); + completeWorkflow(pollResp.getTaskToken()); + + List events = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED); + Assert.assertEquals(1, events.size()); + io.temporal.api.failure.v1.Failure failure = + events.get(0).getNexusOperationCanceledEventAttributes().getFailure(); + Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); + // The cause should have CanceledFailureInfo unwrapped from the JSON chain + io.temporal.api.failure.v1.Failure cause = failure.getCause(); + Assert.assertTrue("Cause should have CanceledFailureInfo", cause.hasCanceledFailureInfo()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + nexusPoller.cancel(true); + } + } + @Test public void testNexusOperationHandlerTemporalFailure() { DataConverter dataConverter = DefaultDataConverter.newDefaultInstance(); From bc6bb169a87580b44324f79ef36be7c32438937c Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 12 Mar 2026 11:43:39 -0700 Subject: [PATCH 2/3] rebase --- .../io/temporal/internal/testservice/TestWorkflowService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 7cf920266..7183ade76 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1119,7 +1119,6 @@ public void respondNexusTaskFailed( NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken()); TestWorkflowMutableState mutableState = getMutableState(tt.getOperationRef().getExecutionId()); - Failure failure = handlerErrorToFailure(request.getError()); if (tt.isCancel()) { // For cancel failures, the operation may already be completed/removed, // so skip token validation and record the event directly. @@ -1198,6 +1197,7 @@ public void completeNexusOperation( private static Failure handlerErrorToFailure(HandlerError err) { return Failure.newBuilder() + .setMessage(err.getFailure().getMessage()) .setNexusHandlerFailureInfo( NexusHandlerFailureInfo.newBuilder() .setType(err.getErrorType()) From e9cb1c588c57efbc5e142471aa6bc9330ed6eae5 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 12 Mar 2026 15:53:53 -0700 Subject: [PATCH 3/3] fix test --- .../functional/NexusWorkflowTest.java | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index bbb903cc0..26b269199 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -8,6 +8,7 @@ import io.temporal.api.common.v1.*; import io.temporal.api.common.v1.Link; import io.temporal.api.enums.v1.*; +import io.temporal.api.failure.v1.NexusHandlerFailureInfo; import io.temporal.api.failure.v1.NexusOperationFailureInfo; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.nexus.v1.*; @@ -965,10 +966,18 @@ public void testNexusOperationHandlerErrorMessagePreserved() { task -> failNexusTask( task.getTaskToken(), - HandlerError.newBuilder() - .setErrorType("BAD_REQUEST") - .setFailure( - Failure.newBuilder().setMessage("specific handler error message")) + io.temporal.api.failure.v1.Failure.newBuilder() + .setMessage("specific handler error message") + .setNexusHandlerFailureInfo( + NexusHandlerFailureInfo.newBuilder().setType("BAD_REQUEST")) + .setCause( + io.temporal.api.failure.v1.Failure.newBuilder() + .setMessage("specific handler error message") + .setApplicationFailureInfo( + io.temporal.api.failure.v1.ApplicationFailureInfo + .newBuilder() + .setType("NexusFailure") + .setNonRetryable(true))) .build())); try { @@ -1489,6 +1498,23 @@ private CompletableFuture completeNexusTask( }); } + private CompletableFuture failNexusTask( + ByteString taskToken, io.temporal.api.failure.v1.Failure failure) { + return CompletableFuture.supplyAsync( + () -> + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .respondNexusTaskFailed( + RespondNexusTaskFailedRequest.newBuilder() + .setIdentity(UUID.randomUUID().toString()) + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setTaskToken(taskToken) + .setFailure(failure) + .build())); + } + @SuppressWarnings("deprecation") // Uses deprecated HandlerError/setError() to test old format private CompletableFuture failNexusTask( ByteString taskToken, HandlerError err) {