diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 0cc7c52ec..27c8c3fc5 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -1462,8 +1462,25 @@ private static void continueAsNewWorkflow( if (d.hasFailure()) { a.setFailure(d.getFailure()); } + // For both search attributes and memo below: a present-but-empty command field is + // preserved as an explicit empty override; only an absent command field falls through + // to inheriting the live workflow state. if (d.hasSearchAttributes()) { a.setSearchAttributes(d.getSearchAttributes()); + } else { + SearchAttributes currentSearchAttributes = + ctx.getWorkflowMutableState().getCurrentSearchAttributes(); + if (currentSearchAttributes.getIndexedFieldsCount() > 0) { + a.setSearchAttributes(currentSearchAttributes); + } + } + if (d.hasMemo()) { + a.setMemo(d.getMemo()); + } else { + Memo currentMemo = ctx.getWorkflowMutableState().getCurrentMemo(); + if (currentMemo.getFieldsCount() > 0) { + a.setMemo(currentMemo); + } } a.setNewExecutionRunId(UUID.randomUUID().toString()); HistoryEvent event = diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index 41c994727..8ebda39ed 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -3,8 +3,10 @@ import io.grpc.Deadline; import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; import io.temporal.api.common.v1.Callback; +import io.temporal.api.common.v1.Memo; import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.SearchAttributes; import io.temporal.api.enums.v1.EventType; import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause; import io.temporal.api.enums.v1.WorkflowExecutionStatus; @@ -32,6 +34,15 @@ interface TestWorkflowMutableState { StartWorkflowExecutionRequest getStartRequest(); + /** Returns the current memo, reflecting any upserts since workflow start. */ + Memo getCurrentMemo(); + + /** + * Returns the current search attributes from the visibility store, reflecting any upserts since + * workflow start. + */ + SearchAttributes getCurrentSearchAttributes(); + void startWorkflowTask( PollWorkflowTaskQueueResponse.Builder task, PollWorkflowTaskQueueRequest pollRequest); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index a1cf4e111..1d2a01541 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -3271,10 +3271,18 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() { } } - private Memo getCurrentMemo() { + @Override + public Memo getCurrentMemo() { return Memo.newBuilder().putAllFields(currentMemo).build(); } + @Override + public SearchAttributes getCurrentSearchAttributes() { + SearchAttributes searchAttributes = + visibilityStore.getSearchAttributesForExecution(executionId); + return searchAttributes == null ? SearchAttributes.getDefaultInstance() : searchAttributes; + } + private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() { WorkflowExecutionConfig.Builder executionConfig = WorkflowExecutionConfig.newBuilder() diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index d85220177..b62f021a3 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1757,6 +1757,9 @@ public String continueAsNew( if (ea.hasSearchAttributes()) { startRequestBuilder.setSearchAttributes(ea.getSearchAttributes()); } + if (ea.hasMemo()) { + startRequestBuilder.setMemo(ea.getMemo()); + } StartWorkflowExecutionRequest startRequest = startRequestBuilder.build(); lock.lock(); Optional lastFail = diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/ContinueAsNewTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/ContinueAsNewTest.java index 6c629b607..595bb4a4f 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/ContinueAsNewTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/ContinueAsNewTest.java @@ -1,16 +1,27 @@ package io.temporal.testserver.functional; +import com.google.common.collect.ImmutableMap; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.common.SearchAttributeKey; +import io.temporal.common.SearchAttributes; import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.common.converter.DefaultDataConverter; import io.temporal.common.interceptors.*; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testserver.functional.common.TestWorkflows; import io.temporal.worker.WorkerFactoryOptions; import io.temporal.workflow.ContinueAsNewOptions; import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; import java.time.Duration; +import java.util.Collections; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -24,7 +35,13 @@ public class ContinueAsNewTest { WorkerFactoryOptions.newBuilder() .setWorkerInterceptors(new StripsTqFromCanInterceptor()) .build()) - .setWorkflowTypes(TestWorkflow.class) + .setWorkflowTypes( + TestWorkflow.class, + OverridingWorkflow.class, + UpsertingSearchAttributesWorkflow.class, + UpsertingMemoWorkflow.class, + ClearingMemoWorkflow.class, + RemovingMemoWorkflow.class) .build(); @Test @@ -54,6 +71,173 @@ public void repeatedFailure() { .isEmpty()); } + private static final SearchAttributeKey CUSTOM_KEYWORD = + SearchAttributeKey.forKeyword("CustomKeywordField"); + + @Test + public void inheritsMemoAndSearchAttributesAcrossContinueAsNew() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(ImmutableMap.of("memoKey", "memoValue")) + .setTypedSearchAttributes( + SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "initialSA").build()) + .build(); + + TestWorkflows.WorkflowTakesBool workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestWorkflows.WorkflowTakesBool.class, options); + workflowStub.execute(true); + + WorkflowExecutionStartedEventAttributes started = + getContinuedRunStartedAttributes(workflowStub); + + Assert.assertTrue("Memo should be inherited by the continued run", started.hasMemo()); + Assert.assertEquals("memoValue", decodeString(started.getMemo().getFieldsOrThrow("memoKey"))); + + Assert.assertTrue( + "Search attributes should be inherited by the continued run", + started.hasSearchAttributes()); + Assert.assertEquals( + "initialSA", + decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField"))); + } + + @Test + public void overridesMemoAndSearchAttributesOnContinueAsNew() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(ImmutableMap.of("memoKey", "originalMemo")) + .setTypedSearchAttributes( + SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "originalSA").build()) + .build(); + + OverridingWorkflowInterface workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(OverridingWorkflowInterface.class, options); + workflowStub.execute(true); + + WorkflowExecutionStartedEventAttributes started = + getContinuedRunStartedAttributes(workflowStub); + + Assert.assertEquals( + "overriddenMemo", decodeString(started.getMemo().getFieldsOrThrow("memoKey"))); + + Assert.assertEquals( + "overriddenSA", + decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField"))); + } + + @Test + public void inheritsUpsertedMemoAcrossContinueAsNew() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(ImmutableMap.of("memoKey", "originalMemo")) + .build(); + + UpsertingMemoWorkflowInterface workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(UpsertingMemoWorkflowInterface.class, options); + workflowStub.execute(true); + + WorkflowExecutionStartedEventAttributes started = + getContinuedRunStartedAttributes(workflowStub); + + Assert.assertEquals( + "upsertedMemo", decodeString(started.getMemo().getFieldsOrThrow("memoKey"))); + } + + @Test + public void inheritsUpsertedSearchAttributesAcrossContinueAsNewWhenCommandOmitsThem() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setTypedSearchAttributes( + SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "originalSA").build()) + .build(); + + UpsertingSearchAttributesWorkflowInterface workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(UpsertingSearchAttributesWorkflowInterface.class, options); + workflowStub.execute(true); + + WorkflowExecutionStartedEventAttributes started = + getContinuedRunStartedAttributes(workflowStub); + + Assert.assertEquals( + "upsertedSA", + decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField"))); + } + + @Test + public void doesNotReinheritRemovedMemoAcrossContinueAsNew() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(ImmutableMap.of("memoKey", "originalMemo")) + .build(); + + RemovingMemoWorkflowInterface workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(RemovingMemoWorkflowInterface.class, options); + workflowStub.execute(true); + + WorkflowExecutionStartedEventAttributes started = + getContinuedRunStartedAttributes(workflowStub); + Assert.assertFalse(started.getMemo().containsFields("memoKey")); + } + + @Test + public void overridesMemoWithEmptyMemoOnContinueAsNew() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(ImmutableMap.of("memoKey", "originalMemo")) + .build(); + + ClearingMemoWorkflowInterface workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(ClearingMemoWorkflowInterface.class, options); + workflowStub.execute(true); + + WorkflowExecutionStartedEventAttributes started = + getContinuedRunStartedAttributes(workflowStub); + Assert.assertTrue("Empty memo should be an explicit override", started.hasMemo()); + Assert.assertEquals(0, started.getMemo().getFieldsCount()); + } + + private WorkflowExecutionStartedEventAttributes getContinuedRunStartedAttributes( + Object workflowStub) { + WorkflowExecution execution = WorkflowStub.fromTyped(workflowStub).getExecution(); + HistoryEvent firstEvent = + testWorkflowRule.getExecutionHistory(execution.getWorkflowId()).getEvents().get(0); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, firstEvent.getEventType()); + WorkflowExecutionStartedEventAttributes started = + firstEvent.getWorkflowExecutionStartedEventAttributes(); + Assert.assertFalse( + "Inspected event must belong to the continued run", + started.getContinuedExecutionRunId().isEmpty()); + return started; + } + + private static String decodeString(Payload payload) { + return DefaultDataConverter.STANDARD_INSTANCE.fromPayload(payload, String.class, String.class); + } + public static class TestWorkflow implements TestWorkflows.WorkflowTakesBool { @Override public void execute(boolean doContinue) { @@ -63,6 +247,99 @@ public void execute(boolean doContinue) { } } + @WorkflowInterface + public interface OverridingWorkflowInterface { + @WorkflowMethod + void execute(boolean doContinue); + } + + public static class OverridingWorkflow implements OverridingWorkflowInterface { + @Override + public void execute(boolean doContinue) { + if (doContinue) { + Workflow.continueAsNew( + ContinueAsNewOptions.newBuilder() + .setMemo(ImmutableMap.of("memoKey", "overriddenMemo")) + .setTypedSearchAttributes( + SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "overriddenSA").build()) + .build(), + false); + } + } + } + + @WorkflowInterface + public interface UpsertingSearchAttributesWorkflowInterface { + @WorkflowMethod + void execute(boolean doContinue); + } + + public static class UpsertingSearchAttributesWorkflow + implements UpsertingSearchAttributesWorkflowInterface { + @Override + public void execute(boolean doContinue) { + if (doContinue) { + Workflow.upsertTypedSearchAttributes(CUSTOM_KEYWORD.valueSet("upsertedSA")); + // Empty typed search attributes are not encoded into the command, so this exercises + // server-side inheritance after the upsert. + Workflow.continueAsNew( + ContinueAsNewOptions.newBuilder() + .setTypedSearchAttributes(SearchAttributes.EMPTY) + .build(), + false); + } + } + } + + @WorkflowInterface + public interface UpsertingMemoWorkflowInterface { + @WorkflowMethod + void execute(boolean doContinue); + } + + public static class UpsertingMemoWorkflow implements UpsertingMemoWorkflowInterface { + @Override + public void execute(boolean doContinue) { + if (doContinue) { + Workflow.upsertMemo(ImmutableMap.of("memoKey", "upsertedMemo")); + Workflow.continueAsNew(false); + } + } + } + + @WorkflowInterface + public interface ClearingMemoWorkflowInterface { + @WorkflowMethod + void execute(boolean doContinue); + } + + public static class ClearingMemoWorkflow implements ClearingMemoWorkflowInterface { + @Override + public void execute(boolean doContinue) { + if (doContinue) { + Workflow.continueAsNew( + ContinueAsNewOptions.newBuilder().setMemo(Collections.emptyMap()).build(), false); + } + } + } + + @WorkflowInterface + public interface RemovingMemoWorkflowInterface { + @WorkflowMethod + void execute(boolean doContinue); + } + + public static class RemovingMemoWorkflow implements RemovingMemoWorkflowInterface { + @Override + public void execute(boolean doContinue) { + if (doContinue) { + // ImmutableMap does not allow null values, and null removes the memo field. + Workflow.upsertMemo(Collections.singletonMap("memoKey", null)); + Workflow.continueAsNew(false); + } + } + } + // Verify that we can strip the TQ name and test server continues onto same TQ private static class StripsTqFromCanInterceptor extends WorkerInterceptorBase { @Override