Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ void startNexusOperation(

void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref);

void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure);

void completeNexusOperation(NexusOperationRef ref, Payload result);

void completeAsyncNexusOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ private interface UpdateProcedure {
private final Map<String, Long> activityById = new HashMap<>();
private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
private final Map<Long, StateMachine<NexusOperationData>> nexusOperations = new HashMap<>();
// Tracks cancelRequestedEventId by scheduledEventId, persists after operation removal.
private final Map<Long, Long> nexusCancelRequestedEventIds = new HashMap<>();
// Tracks scheduledEventIds of nexus cancel requests that have not yet received a response.
private final Set<Long> unresolvedNexusCancelRequests = new HashSet<>();
private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
private final Map<String, StateMachine<CancelExternalData>> externalCancellations =
Expand Down Expand Up @@ -486,10 +490,13 @@ public void completeWorkflowTask(
.asRuntimeException();
}

if (unhandledCommand(request) || unhandledMessages(request)) {
if (unhandledCommand(request)
|| unhandledMessages(request)
|| hasUnresolvedNexusCancelWithCompletion(request)) {
// Fail the workflow task if there are new events or messages and a command tries to
// complete the workflow. Record the failure in history, then throw an error to the
// caller (matching real server behavior).
// complete the workflow, or if there are unresolved nexus cancel requests. Record the
// failure in history, then throw an error to the caller (matching real server
// behavior).
failWorkflowTaskWithAReason(
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND,
null,
Expand Down Expand Up @@ -673,6 +680,12 @@ private boolean unhandledMessages(RespondWorkflowTaskCompletedRequest request) {
&& hasCompletionCommand(request.getCommandsList()));
}

private boolean hasUnresolvedNexusCancelWithCompletion(
RespondWorkflowTaskCompletedRequest request) {
return !unresolvedNexusCancelRequests.isEmpty()
&& hasCompletionCommand(request.getCommandsList());
}

private boolean hasCompletionCommand(List<Command> commands) {
for (Command command : commands) {
if (WorkflowExecutionUtils.isWorkflowExecutionCompleteCommand(command)) {
Expand Down Expand Up @@ -899,6 +912,12 @@ private void processRequestCancelNexusOperation(
ctx.setNeedWorkflowTask(true);
} else {
operation.action(Action.REQUEST_CANCELLATION, ctx, null, workflowTaskCompletedId);
ctx.onCommit(
historySize -> {
nexusCancelRequestedEventIds.put(
scheduleEventId, operation.getData().cancelRequestedEventId);
unresolvedNexusCancelRequests.add(scheduleEventId);
});
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout),
() ->
Expand Down Expand Up @@ -2339,6 +2358,10 @@ public void startNexusOperation(
update(
ctx -> {
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
if (operation.getState() == State.STARTED) {
// Operation was already started (e.g. from a previous attempt before retry).
return;
}
operation.action(StateMachines.Action.START, ctx, resp, 0);
operation.getData().identity = clientIdentity;

Expand Down Expand Up @@ -2378,13 +2401,30 @@ public void cancelNexusOperation(NexusOperationRef ref, Failure failure) {
});
}

/**
* Resolves the cancelRequestedEventId for a nexus operation, checking both the active operations
* map and the persisted cancel request IDs (for operations that have already completed/removed).
*/
private long resolveCancelRequestedEventId(long scheduledEventId) {
StateMachine<NexusOperationData> operation = nexusOperations.get(scheduledEventId);
if (operation != null) {
return operation.getData().cancelRequestedEventId;
}
Long stored = nexusCancelRequestedEventIds.get(scheduledEventId);
return stored != null ? stored : 0;
}

@Override
public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
update(
ctx -> {
StateMachine<NexusOperationData> operation =
getPendingNexusOperation(ref.getScheduledEventId());
if (!operationInFlight(operation.getState())) {
nexusOperations.get(ref.getScheduledEventId());
if (operation != null && !operationInFlight(operation.getState())) {
return;
}
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
if (cancelRequestedEventId == 0) {
return;
}
ctx.addEvent(
Expand All @@ -2393,12 +2433,39 @@ public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
.setNexusOperationCancelRequestCompletedEventAttributes(
NexusOperationCancelRequestCompletedEventAttributes.newBuilder()
.setScheduledEventId(ref.getScheduledEventId())
.setRequestedEventId(operation.getData().cancelRequestedEventId))
.setRequestedEventId(cancelRequestedEventId))
.build());
ctx.onCommit(
historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId()));
scheduleWorkflowTask(ctx);
ctx.unlockTimer("cancelNexusOperationRequestAcknowledge");
});
}

@Override
public void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure) {
update(
ctx -> {
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
if (cancelRequestedEventId == 0) {
return;
}
ctx.addEvent(
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED)
.setNexusOperationCancelRequestFailedEventAttributes(
NexusOperationCancelRequestFailedEventAttributes.newBuilder()
.setScheduledEventId(ref.getScheduledEventId())
.setRequestedEventId(cancelRequestedEventId)
.setFailure(failure))
.build());
ctx.onCommit(
historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId()));
scheduleWorkflowTask(ctx);
ctx.unlockTimer("failNexusOperationCancelRequest");
});
}

@Override
public void completeNexusOperation(NexusOperationRef ref, Payload result) {
update(
Expand Down Expand Up @@ -2471,6 +2538,8 @@ private void timeoutNexusOperation(
}
operation.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0);
nexusOperations.remove(scheduledEventId);
// The cancel response won't matter after a timeout, so clear the unresolved cancel.
unresolvedNexusCancelRequests.remove(scheduledEventId);
scheduleWorkflowTask(ctx);
});
} catch (StatusRuntimeException e) {
Expand All @@ -2496,7 +2565,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in
ctx -> {
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
if (attempt != operation.getData().getAttempt()
|| isTerminalState(operation.getState())) {
|| isTerminalState(operation.getState())
|| operation.getState() == State.STARTED) {
throw Status.NOT_FOUND.withDescription("Timer fired earlier").asRuntimeException();
}

Expand All @@ -2510,6 +2580,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in

if (isTerminalState(operation.getState())) {
nexusOperations.remove(scheduledEventId);
// Cancel response won't arrive after terminal state, unblock workflow completion.
unresolvedNexusCancelRequests.remove(scheduledEventId);
scheduleWorkflowTask(ctx);
} else {
retryNexusTask(ctx, operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,11 @@ public void respondNexusTaskFailed(
NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken());
TestWorkflowMutableState mutableState =
getMutableState(tt.getOperationRef().getExecutionId());
if (mutableState.validateOperationTaskToken(tt)) {
if (tt.isCancel()) {
// For cancel failures, the operation may already be completed/removed,
// so skip token validation and record the event directly.
mutableState.failNexusOperationCancelRequest(tt.getOperationRef(), failure);
} else if (mutableState.validateOperationTaskToken(tt)) {
mutableState.failNexusOperation(tt.getOperationRef(), failure);
}
responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance());
Expand Down Expand Up @@ -1193,6 +1197,7 @@ public void completeNexusOperation(

private static Failure handlerErrorToFailure(HandlerError err) {
return Failure.newBuilder()
.setMessage(err.getFailure().getMessage())
.setNexusHandlerFailureInfo(
NexusHandlerFailureInfo.newBuilder()
.setType(err.getErrorType())
Expand Down
Loading
Loading