-
Notifications
You must be signed in to change notification settings - Fork 196
💥 Use Temporal Failures for Nexus Error Serialization #2773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
296f927
8607ddb
0dec89d
3b8fe5e
4219704
3495885
3d44cf5
9545fd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ plugins { | |
|
|
||
| allprojects { | ||
| repositories { | ||
| mavenLocal() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SNAPSHOT dependency and mavenLocal not suitable for mergeMedium Severity
Additional Locations (1) |
||
| mavenCentral() | ||
| } | ||
| } | ||
|
|
@@ -30,7 +31,7 @@ ext { | |
| // Platforms | ||
| grpcVersion = '1.75.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager | ||
| jacksonVersion = '2.15.4' // [2.9.0,) | ||
| nexusVersion = '0.4.0-alpha' | ||
| nexusVersion = '0.5.0-SNAPSHOT' | ||
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. | ||
| micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| import io.temporal.common.converter.FailureConverter; | ||
| import io.temporal.internal.activity.ActivityTaskHandlerImpl; | ||
| import io.temporal.internal.common.FailureUtils; | ||
| import io.temporal.internal.common.NexusUtil; | ||
| import io.temporal.internal.common.ProtobufTimeUtils; | ||
| import io.temporal.internal.sync.POJOWorkflowImplementationFactory; | ||
| import io.temporal.serviceclient.CheckedExceptionWrapper; | ||
|
|
@@ -192,7 +193,18 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d | |
| retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE; | ||
| break; | ||
| } | ||
| return new HandlerException(info.getType(), cause, retryBehavior); | ||
| if (failure | ||
| .getMessage() | ||
| .startsWith(String.format("handler error (%s)", info.getType()))) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand this is here to allow proper decoding of legacy errors, but for which languages exactly? If we mean to support legacy errors coming from Go or others, are we sure the casing returned by
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any old failure serialized by Go or Java. This is the handler error type so that should match the Nexus spec and consistent across Go and Java |
||
| return new HandlerException(info.getType(), cause, retryBehavior); | ||
| } else { | ||
| return new HandlerException( | ||
| info.getType(), | ||
| failure.getMessage(), | ||
| cause, | ||
| retryBehavior, | ||
| NexusUtil.temporalFailureToNexusFailureInfo(failure)); | ||
| } | ||
| } | ||
| case FAILUREINFO_NOT_SET: | ||
| default: | ||
|
|
@@ -324,6 +336,9 @@ private Failure exceptionToFailure(Throwable throwable) { | |
| failure.setNexusOperationExecutionFailureInfo(op); | ||
| } else if (throwable instanceof HandlerException) { | ||
| HandlerException he = (HandlerException) throwable; | ||
| if (he.getOriginalFailure() != null) { | ||
| return NexusUtil.nexusFailureToAPIFailure(he.getOriginalFailure(), true); | ||
| } | ||
| NexusHandlerErrorRetryBehavior retryBehavior = | ||
| NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; | ||
| switch (he.getRetryBehavior()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,18 @@ | ||
| package io.temporal.internal.common; | ||
|
|
||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.fasterxml.jackson.databind.ObjectWriter; | ||
| import com.google.protobuf.ByteString; | ||
| import com.google.protobuf.InvalidProtocolBufferException; | ||
| import com.google.protobuf.util.JsonFormat; | ||
| import io.nexusrpc.FailureInfo; | ||
| import io.nexusrpc.Link; | ||
| import io.nexusrpc.handler.HandlerException; | ||
| import io.temporal.api.common.v1.Payload; | ||
| import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior; | ||
| import io.temporal.api.nexus.v1.Failure; | ||
| import io.temporal.api.nexus.v1.HandlerError; | ||
| import io.temporal.common.converter.DataConverter; | ||
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
|
|
@@ -13,7 +21,8 @@ | |
| import java.util.Map; | ||
|
|
||
| public class NexusUtil { | ||
| private static final JsonFormat.Printer JSON_PRINTER = | ||
| private static final ObjectWriter JSON_OBJECT_WRITER = new ObjectMapper().writer(); | ||
| private static final JsonFormat.Printer PROTO_JSON_PRINTER = | ||
| JsonFormat.printer().omittingInsignificantWhitespace(); | ||
| private static final String TEMPORAL_FAILURE_TYPE_STRING = | ||
| io.temporal.api.failure.v1.Failure.getDescriptor().getFullName(); | ||
|
|
@@ -47,23 +56,134 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink) | |
| .build(); | ||
| } | ||
|
|
||
| public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) { | ||
| io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception); | ||
| public static Failure temporalFailureToNexusFailure( | ||
| io.temporal.api.failure.v1.Failure temporalFailure) { | ||
| String details; | ||
| try { | ||
| details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build()); | ||
| details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Go & Python, we're clearing the stack trace out of the details as well as the message. Seems like it'd make sense to do it here as well. |
||
| } catch (InvalidProtocolBufferException e) { | ||
| return Failure.newBuilder() | ||
| .setMessage("Failed to serialize failure details") | ||
| .setDetails(ByteString.copyFromUtf8(e.getMessage())) | ||
| .build(); | ||
| } | ||
| return Failure.newBuilder() | ||
| .setMessage(failure.getMessage()) | ||
| .setDetails(ByteString.copyFromUtf8(details)) | ||
| .putAllMetadata(NEXUS_FAILURE_METADATA) | ||
| Failure.Builder failureBuilder = | ||
| Failure.newBuilder() | ||
| .setMessage(temporalFailure.getMessage()) | ||
| .setDetails(ByteString.copyFromUtf8(details)) | ||
| .putAllMetadata(NEXUS_FAILURE_METADATA); | ||
| if (!temporalFailure.getStackTrace().isEmpty()) { | ||
| failureBuilder.setStackTrace(temporalFailure.getStackTrace()); | ||
| } | ||
| return failureBuilder.build(); | ||
| } | ||
|
|
||
| public static io.temporal.api.failure.v1.Failure nexusFailureToAPIFailure( | ||
| FailureInfo failureInfo, boolean retryable) { | ||
| io.temporal.api.failure.v1.Failure.Builder apiFailure = | ||
| io.temporal.api.failure.v1.Failure.newBuilder(); | ||
|
|
||
| if (failureInfo.getMetadata().containsKey("type") | ||
| && failureInfo.getMetadata().get("type").equals(TEMPORAL_FAILURE_TYPE_STRING)) { | ||
| // Details contains a JSON-serialized Temporal failure | ||
| try { | ||
| JsonFormat.parser().ignoringUnknownFields().merge(failureInfo.getDetailsJson(), apiFailure); | ||
| } catch (InvalidProtocolBufferException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } else { | ||
| // Create an ApplicationFailure with the Nexus failure data | ||
| io.temporal.api.common.v1.Payloads payloads = nexusFailureMetadataToPayloads(failureInfo); | ||
| io.temporal.api.failure.v1.ApplicationFailureInfo.Builder appFailureInfo = | ||
| io.temporal.api.failure.v1.ApplicationFailureInfo.newBuilder() | ||
| .setType("NexusFailure") | ||
| .setNonRetryable(!retryable); | ||
| if (payloads != null) { | ||
| appFailureInfo.setDetails(payloads); | ||
| } | ||
| apiFailure.setApplicationFailureInfo(appFailureInfo.build()); | ||
| } | ||
|
|
||
| // Ensure these always get written | ||
| apiFailure.setMessage(failureInfo.getMessage()); | ||
|
|
||
| return apiFailure.build(); | ||
| } | ||
|
|
||
| private static io.temporal.api.common.v1.Payloads nexusFailureMetadataToPayloads( | ||
| FailureInfo failureInfo) { | ||
| if (failureInfo.getMetadata().isEmpty() && failureInfo.getDetailsJson().isEmpty()) { | ||
| return null; | ||
| } | ||
|
|
||
| // Create a copy without the message before serializing | ||
| FailureInfo failureCopy = FailureInfo.newBuilder(failureInfo).setMessage("").build(); | ||
| String json = null; | ||
| try { | ||
| json = JSON_OBJECT_WRITER.writeValueAsString(failureCopy); | ||
| } catch (JsonProcessingException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
|
|
||
| return io.temporal.api.common.v1.Payloads.newBuilder() | ||
| .addPayloads( | ||
| Payload.newBuilder() | ||
| .putMetadata("encoding", ByteString.copyFromUtf8("json/plain")) | ||
| .setData(ByteString.copyFromUtf8(json)) | ||
| .build()) | ||
| .build(); | ||
| } | ||
|
|
||
| public static FailureInfo temporalFailureToNexusFailureInfo( | ||
| io.temporal.api.failure.v1.Failure temporalFailure) { | ||
| String details; | ||
| try { | ||
| details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build()); | ||
| } catch (InvalidProtocolBufferException e) { | ||
| return FailureInfo.newBuilder() | ||
| .setMessage("Failed to serialize failure details") | ||
| .setDetailsJson(e.getMessage()) | ||
| .build(); | ||
| } | ||
| return FailureInfo.newBuilder() | ||
| .setMessage(temporalFailure.getMessage()) | ||
| .setDetailsJson(details) | ||
| .putMetadata("type", TEMPORAL_FAILURE_TYPE_STRING) | ||
| .build(); | ||
| } | ||
|
|
||
| public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) { | ||
| io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception); | ||
| return temporalFailureToNexusFailure(failure); | ||
| } | ||
|
|
||
| public static HandlerError handlerErrorToNexusError( | ||
| HandlerException e, DataConverter dataConverter) { | ||
| HandlerError.Builder handlerError = | ||
| HandlerError.newBuilder() | ||
| .setErrorType(e.getErrorType().toString()) | ||
| .setRetryBehavior(mapRetryBehavior(e.getRetryBehavior())); | ||
| // TODO: check if this works on old server | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we checked this? If so, let's remove this todo |
||
| if (e.getCause() != null) { | ||
| handlerError.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter)); | ||
| } else if (e.getMessage() != null && !e.getMessage().isEmpty()) { | ||
| // Include message even when there's no cause | ||
| handlerError.setFailure(Failure.newBuilder().setMessage(e.getMessage()).build()); | ||
| } | ||
| return handlerError.build(); | ||
| } | ||
|
|
||
| private static NexusHandlerErrorRetryBehavior mapRetryBehavior( | ||
| HandlerException.RetryBehavior retryBehavior) { | ||
| switch (retryBehavior) { | ||
| case RETRYABLE: | ||
| return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE; | ||
| case NON_RETRYABLE: | ||
| return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE; | ||
| default: | ||
| return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; | ||
| } | ||
| } | ||
|
|
||
| private NexusUtil() {} | ||
| } | ||


Uh oh!
There was an error while loading. Please reload this page.