Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:

- name: Start containerized server and dependencies
env:
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-151.0
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ plugins {

allprojects {
repositories {
mavenLocal()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SNAPSHOT dependency and mavenLocal not suitable for merge

Medium Severity

mavenLocal() was added to the allprojects repositories block alongside nexusVersion = '0.5.0-SNAPSHOT'. Both are development-time configurations — mavenLocal() can cause non-reproducible builds by resolving artifacts from the local cache, and SNAPSHOT dependencies are inherently unstable. These need to be replaced with a released version before merging.

Additional Locations (1)

Fix in Cursor Fix in Web

mavenCentral()
}
}
Expand All @@ -30,7 +31,7 @@ ext {
// Platforms
grpcVersion = '1.75.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.15.4' // [2.9.0,)
nexusVersion = '0.4.0-alpha'
nexusVersion = '0.5.0-SNAPSHOT'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.temporal.common.converter.FailureConverter;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.common.FailureUtils;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
import io.temporal.serviceclient.CheckedExceptionWrapper;
Expand Down Expand Up @@ -192,7 +193,18 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE;
break;
}
return new HandlerException(info.getType(), cause, retryBehavior);
if (failure
.getMessage()
.startsWith(String.format("handler error (%s)", info.getType()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is here to allow proper decoding of legacy errors, but for which languages exactly? If we mean to support legacy errors coming from Go or others, are we sure the casing returned by .getType() here would match the error type of other SDKs?, or should we make this condition a little bit more resilient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any old failure serialized by Go or Java. This is the handler error type so that should match the Nexus spec and consistent across Go and Java

return new HandlerException(info.getType(), cause, retryBehavior);
} else {
return new HandlerException(
info.getType(),
failure.getMessage(),
cause,
retryBehavior,
NexusUtil.temporalFailureToNexusFailureInfo(failure));
}
}
case FAILUREINFO_NOT_SET:
default:
Expand Down Expand Up @@ -324,6 +336,9 @@ private Failure exceptionToFailure(Throwable throwable) {
failure.setNexusOperationExecutionFailureInfo(op);
} else if (throwable instanceof HandlerException) {
HandlerException he = (HandlerException) throwable;
if (he.getOriginalFailure() != null) {
return NexusUtil.nexusFailureToAPIFailure(he.getOriginalFailure(), true);
}
NexusHandlerErrorRetryBehavior retryBehavior =
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
switch (he.getRetryBehavior()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package io.temporal.internal.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.nexusrpc.FailureInfo;
import io.nexusrpc.Link;
import io.nexusrpc.handler.HandlerException;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
import io.temporal.api.nexus.v1.Failure;
import io.temporal.api.nexus.v1.HandlerError;
import io.temporal.common.converter.DataConverter;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -13,7 +21,8 @@
import java.util.Map;

public class NexusUtil {
private static final JsonFormat.Printer JSON_PRINTER =
private static final ObjectWriter JSON_OBJECT_WRITER = new ObjectMapper().writer();
private static final JsonFormat.Printer PROTO_JSON_PRINTER =
JsonFormat.printer().omittingInsignificantWhitespace();
private static final String TEMPORAL_FAILURE_TYPE_STRING =
io.temporal.api.failure.v1.Failure.getDescriptor().getFullName();
Expand Down Expand Up @@ -47,23 +56,134 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
.build();
}

public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
public static Failure temporalFailureToNexusFailure(
io.temporal.api.failure.v1.Failure temporalFailure) {
String details;
try {
details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build());
details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go & Python, we're clearing the stack trace out of the details as well as the message. Seems like it'd make sense to do it here as well.

} catch (InvalidProtocolBufferException e) {
return Failure.newBuilder()
.setMessage("Failed to serialize failure details")
.setDetails(ByteString.copyFromUtf8(e.getMessage()))
.build();
}
return Failure.newBuilder()
.setMessage(failure.getMessage())
.setDetails(ByteString.copyFromUtf8(details))
.putAllMetadata(NEXUS_FAILURE_METADATA)
Failure.Builder failureBuilder =
Failure.newBuilder()
.setMessage(temporalFailure.getMessage())
.setDetails(ByteString.copyFromUtf8(details))
.putAllMetadata(NEXUS_FAILURE_METADATA);
if (!temporalFailure.getStackTrace().isEmpty()) {
failureBuilder.setStackTrace(temporalFailure.getStackTrace());
}
return failureBuilder.build();
}

public static io.temporal.api.failure.v1.Failure nexusFailureToAPIFailure(
FailureInfo failureInfo, boolean retryable) {
io.temporal.api.failure.v1.Failure.Builder apiFailure =
io.temporal.api.failure.v1.Failure.newBuilder();

if (failureInfo.getMetadata().containsKey("type")
&& failureInfo.getMetadata().get("type").equals(TEMPORAL_FAILURE_TYPE_STRING)) {
// Details contains a JSON-serialized Temporal failure
try {
JsonFormat.parser().ignoringUnknownFields().merge(failureInfo.getDetailsJson(), apiFailure);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
} else {
// Create an ApplicationFailure with the Nexus failure data
io.temporal.api.common.v1.Payloads payloads = nexusFailureMetadataToPayloads(failureInfo);
io.temporal.api.failure.v1.ApplicationFailureInfo.Builder appFailureInfo =
io.temporal.api.failure.v1.ApplicationFailureInfo.newBuilder()
.setType("NexusFailure")
.setNonRetryable(!retryable);
if (payloads != null) {
appFailureInfo.setDetails(payloads);
}
apiFailure.setApplicationFailureInfo(appFailureInfo.build());
}

// Ensure these always get written
apiFailure.setMessage(failureInfo.getMessage());

return apiFailure.build();
}

private static io.temporal.api.common.v1.Payloads nexusFailureMetadataToPayloads(
FailureInfo failureInfo) {
if (failureInfo.getMetadata().isEmpty() && failureInfo.getDetailsJson().isEmpty()) {
return null;
}

// Create a copy without the message before serializing
FailureInfo failureCopy = FailureInfo.newBuilder(failureInfo).setMessage("").build();
String json = null;
try {
json = JSON_OBJECT_WRITER.writeValueAsString(failureCopy);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

return io.temporal.api.common.v1.Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.putMetadata("encoding", ByteString.copyFromUtf8("json/plain"))
.setData(ByteString.copyFromUtf8(json))
.build())
.build();
}

public static FailureInfo temporalFailureToNexusFailureInfo(
io.temporal.api.failure.v1.Failure temporalFailure) {
String details;
try {
details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build());
} catch (InvalidProtocolBufferException e) {
return FailureInfo.newBuilder()
.setMessage("Failed to serialize failure details")
.setDetailsJson(e.getMessage())
.build();
}
return FailureInfo.newBuilder()
.setMessage(temporalFailure.getMessage())
.setDetailsJson(details)
.putMetadata("type", TEMPORAL_FAILURE_TYPE_STRING)
.build();
}

public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
return temporalFailureToNexusFailure(failure);
}

public static HandlerError handlerErrorToNexusError(
HandlerException e, DataConverter dataConverter) {
HandlerError.Builder handlerError =
HandlerError.newBuilder()
.setErrorType(e.getErrorType().toString())
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()));
// TODO: check if this works on old server

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we checked this? If so, let's remove this todo

if (e.getCause() != null) {
handlerError.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter));
} else if (e.getMessage() != null && !e.getMessage().isEmpty()) {
// Include message even when there's no cause
handlerError.setFailure(Failure.newBuilder().setMessage(e.getMessage()).build());
}
return handlerError.build();
}

private static NexusHandlerErrorRetryBehavior mapRetryBehavior(
HandlerException.RetryBehavior retryBehavior) {
switch (retryBehavior) {
case RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
case NON_RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
default:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
}
}

private NexusUtil() {}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package io.temporal.internal.nexus;

import static io.temporal.internal.common.NexusUtil.exceptionToNexusFailure;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import com.uber.m3.tally.Scope;
import io.grpc.StatusRuntimeException;
import io.nexusrpc.Header;
import io.nexusrpc.OperationException;
import io.nexusrpc.OperationState;
import io.nexusrpc.handler.*;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
import io.temporal.api.nexus.v1.*;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowNotFoundException;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.interceptors.WorkerInterceptor;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
Expand Down Expand Up @@ -78,9 +80,6 @@ public boolean start() {
public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException {
Request request = task.getResponse().getRequest();
Map<String, String> headers = request.getHeaderMap();
if (headers == null) {
headers = Collections.emptyMap();
}

OperationContext.Builder ctx = OperationContext.newBuilder();
headers.forEach(ctx::putHeader);
Expand Down Expand Up @@ -129,18 +128,9 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
new RuntimeException("Unknown request type: " + request.getVariantCase()));
}
} catch (HandlerException e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(e.getErrorType().toString())
.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter))
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()))
.build());
return new Result(e);
} catch (Throwable e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(HandlerException.ErrorType.INTERNAL.toString())
.setFailure(exceptionToNexusFailure(e, dataConverter))
.build());
return new Result(new HandlerException(HandlerException.ErrorType.INTERNAL, e));
} finally {
// If the task timed out, we should not send a response back to the server
if (timedOut.get()) {
Expand All @@ -154,18 +144,6 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}
}

private NexusHandlerErrorRetryBehavior mapRetryBehavior(
HandlerException.RetryBehavior retryBehavior) {
switch (retryBehavior) {
case RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
case NON_RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
default:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
}
}

private void cancelOperation(OperationContext context, OperationCancelDetails details) {
try {
serviceHandler.cancelOperation(context, details);
Expand Down Expand Up @@ -215,6 +193,7 @@ private void convertKnownFailures(Throwable e) {
if (((ApplicationFailure) failure).isNonRetryable()) {
throw new HandlerException(
HandlerException.ErrorType.INTERNAL,
"Handler failed with non-retryable application error",
failure,
HandlerException.RetryBehavior.NON_RETRYABLE);
}
Expand Down Expand Up @@ -346,11 +325,21 @@ private StartOperationResponse handleStartOperation(
convertKnownFailures(failure);
}
} catch (OperationException e) {
startResponseBuilder.setOperationError(
UnsuccessfulOperationError.newBuilder()
.setOperationState(e.getState().toString().toLowerCase())
.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter))
.build());
TemporalFailure temporalFailure;
if (e.getState() == OperationState.FAILED) {
temporalFailure =
ApplicationFailure.newFailureWithCause(e.getMessage(), "OperationError", e.getCause());
temporalFailure.setStackTrace(e.getStackTrace());
} else if (e.getState() == OperationState.CANCELED) {
temporalFailure =
new CanceledFailure(e.getMessage(), new EncodedValues(null), e.getCause());
temporalFailure.setStackTrace(e.getStackTrace());
} else {
throw new HandlerException(
HandlerException.ErrorType.INTERNAL,
new RuntimeException("Unknown operation state: " + e.getState()));
}
startResponseBuilder.setFailure(dataConverter.exceptionToFailure(temporalFailure));
}
return startResponseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.temporal.internal.nexus;

import io.nexusrpc.OperationException;
import io.nexusrpc.OperationInfo;
import io.nexusrpc.handler.*;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkerInterceptor;
Expand Down Expand Up @@ -51,20 +50,6 @@ public OperationStartResult<Object> start(
.getResult();
}

@Override
public Object fetchResult(
OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails)
throws OperationException {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public OperationInfo fetchInfo(
OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails)
throws HandlerException {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public void cancel(
OperationContext operationContext, OperationCancelDetails operationCancelDetails) {
Expand Down
Loading