Skip to content

Commit f76429a

Browse files
committed
Fix ContinueAsNew dropping memo and search attributes in test server
1 parent 5a765b1 commit f76429a

5 files changed

Lines changed: 318 additions & 2 deletions

File tree

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,8 +1462,25 @@ private static void continueAsNewWorkflow(
14621462
if (d.hasFailure()) {
14631463
a.setFailure(d.getFailure());
14641464
}
1465+
// For both search attributes and memo below: a present-but-empty command field is
1466+
// preserved as an explicit empty override; only an absent command field falls through
1467+
// to inheriting the live workflow state.
14651468
if (d.hasSearchAttributes()) {
14661469
a.setSearchAttributes(d.getSearchAttributes());
1470+
} else {
1471+
SearchAttributes currentSearchAttributes =
1472+
ctx.getWorkflowMutableState().getCurrentSearchAttributes();
1473+
if (currentSearchAttributes.getIndexedFieldsCount() > 0) {
1474+
a.setSearchAttributes(currentSearchAttributes);
1475+
}
1476+
}
1477+
if (d.hasMemo()) {
1478+
a.setMemo(d.getMemo());
1479+
} else {
1480+
Memo currentMemo = ctx.getWorkflowMutableState().getCurrentMemo();
1481+
if (currentMemo.getFieldsCount() > 0) {
1482+
a.setMemo(currentMemo);
1483+
}
14671484
}
14681485
a.setNewExecutionRunId(UUID.randomUUID().toString());
14691486
HistoryEvent event =

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import io.grpc.Deadline;
44
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
55
import io.temporal.api.common.v1.Callback;
6+
import io.temporal.api.common.v1.Memo;
67
import io.temporal.api.common.v1.Payload;
78
import io.temporal.api.common.v1.Payloads;
9+
import io.temporal.api.common.v1.SearchAttributes;
810
import io.temporal.api.enums.v1.EventType;
911
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
1012
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
@@ -32,6 +34,15 @@ interface TestWorkflowMutableState {
3234

3335
StartWorkflowExecutionRequest getStartRequest();
3436

37+
/** Returns the current memo, reflecting any upserts since workflow start. */
38+
Memo getCurrentMemo();
39+
40+
/**
41+
* Returns the current search attributes from the visibility store, reflecting any upserts since
42+
* workflow start.
43+
*/
44+
SearchAttributes getCurrentSearchAttributes();
45+
3546
void startWorkflowTask(
3647
PollWorkflowTaskQueueResponse.Builder task, PollWorkflowTaskQueueRequest pollRequest);
3748

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3271,10 +3271,18 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() {
32713271
}
32723272
}
32733273

3274-
private Memo getCurrentMemo() {
3274+
@Override
3275+
public Memo getCurrentMemo() {
32753276
return Memo.newBuilder().putAllFields(currentMemo).build();
32763277
}
32773278

3279+
@Override
3280+
public SearchAttributes getCurrentSearchAttributes() {
3281+
SearchAttributes searchAttributes =
3282+
visibilityStore.getSearchAttributesForExecution(executionId);
3283+
return searchAttributes == null ? SearchAttributes.getDefaultInstance() : searchAttributes;
3284+
}
3285+
32783286
private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() {
32793287
WorkflowExecutionConfig.Builder executionConfig =
32803288
WorkflowExecutionConfig.newBuilder()

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,9 @@ public String continueAsNew(
17571757
if (ea.hasSearchAttributes()) {
17581758
startRequestBuilder.setSearchAttributes(ea.getSearchAttributes());
17591759
}
1760+
if (ea.hasMemo()) {
1761+
startRequestBuilder.setMemo(ea.getMemo());
1762+
}
17601763
StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
17611764
lock.lock();
17621765
Optional<Failure> lastFail =

temporal-test-server/src/test/java/io/temporal/testserver/functional/ContinueAsNewTest.java

Lines changed: 278 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
package io.temporal.testserver.functional;
22

3+
import com.google.common.collect.ImmutableMap;
4+
import io.temporal.api.common.v1.Payload;
35
import io.temporal.api.common.v1.WorkflowExecution;
6+
import io.temporal.api.enums.v1.EventType;
7+
import io.temporal.api.history.v1.HistoryEvent;
8+
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
49
import io.temporal.client.WorkflowOptions;
510
import io.temporal.client.WorkflowStub;
11+
import io.temporal.common.SearchAttributeKey;
12+
import io.temporal.common.SearchAttributes;
613
import io.temporal.common.WorkflowExecutionHistory;
14+
import io.temporal.common.converter.DefaultDataConverter;
715
import io.temporal.common.interceptors.*;
816
import io.temporal.testing.internal.SDKTestWorkflowRule;
917
import io.temporal.testserver.functional.common.TestWorkflows;
1018
import io.temporal.worker.WorkerFactoryOptions;
1119
import io.temporal.workflow.ContinueAsNewOptions;
1220
import io.temporal.workflow.Workflow;
21+
import io.temporal.workflow.WorkflowInterface;
22+
import io.temporal.workflow.WorkflowMethod;
1323
import java.time.Duration;
24+
import java.util.Collections;
1425
import org.junit.Assert;
1526
import org.junit.Rule;
1627
import org.junit.Test;
@@ -24,7 +35,13 @@ public class ContinueAsNewTest {
2435
WorkerFactoryOptions.newBuilder()
2536
.setWorkerInterceptors(new StripsTqFromCanInterceptor())
2637
.build())
27-
.setWorkflowTypes(TestWorkflow.class)
38+
.setWorkflowTypes(
39+
TestWorkflow.class,
40+
OverridingWorkflow.class,
41+
UpsertingSearchAttributesWorkflow.class,
42+
UpsertingMemoWorkflow.class,
43+
ClearingMemoWorkflow.class,
44+
RemovingMemoWorkflow.class)
2845
.build();
2946

3047
@Test
@@ -54,6 +71,173 @@ public void repeatedFailure() {
5471
.isEmpty());
5572
}
5673

74+
private static final SearchAttributeKey<String> CUSTOM_KEYWORD =
75+
SearchAttributeKey.forKeyword("CustomKeywordField");
76+
77+
@Test
78+
public void inheritsMemoAndSearchAttributesAcrossContinueAsNew() {
79+
WorkflowOptions options =
80+
WorkflowOptions.newBuilder()
81+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
82+
.setTaskQueue(testWorkflowRule.getTaskQueue())
83+
.setMemo(ImmutableMap.of("memoKey", "memoValue"))
84+
.setTypedSearchAttributes(
85+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "initialSA").build())
86+
.build();
87+
88+
TestWorkflows.WorkflowTakesBool workflowStub =
89+
testWorkflowRule
90+
.getWorkflowClient()
91+
.newWorkflowStub(TestWorkflows.WorkflowTakesBool.class, options);
92+
workflowStub.execute(true);
93+
94+
WorkflowExecutionStartedEventAttributes started =
95+
getContinuedRunStartedAttributes(workflowStub);
96+
97+
Assert.assertTrue("Memo should be inherited by the continued run", started.hasMemo());
98+
Assert.assertEquals("memoValue", decodeString(started.getMemo().getFieldsOrThrow("memoKey")));
99+
100+
Assert.assertTrue(
101+
"Search attributes should be inherited by the continued run",
102+
started.hasSearchAttributes());
103+
Assert.assertEquals(
104+
"initialSA",
105+
decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField")));
106+
}
107+
108+
@Test
109+
public void overridesMemoAndSearchAttributesOnContinueAsNew() {
110+
WorkflowOptions options =
111+
WorkflowOptions.newBuilder()
112+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
113+
.setTaskQueue(testWorkflowRule.getTaskQueue())
114+
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
115+
.setTypedSearchAttributes(
116+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "originalSA").build())
117+
.build();
118+
119+
OverridingWorkflowInterface workflowStub =
120+
testWorkflowRule
121+
.getWorkflowClient()
122+
.newWorkflowStub(OverridingWorkflowInterface.class, options);
123+
workflowStub.execute(true);
124+
125+
WorkflowExecutionStartedEventAttributes started =
126+
getContinuedRunStartedAttributes(workflowStub);
127+
128+
Assert.assertEquals(
129+
"overriddenMemo", decodeString(started.getMemo().getFieldsOrThrow("memoKey")));
130+
131+
Assert.assertEquals(
132+
"overriddenSA",
133+
decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField")));
134+
}
135+
136+
@Test
137+
public void inheritsUpsertedMemoAcrossContinueAsNew() {
138+
WorkflowOptions options =
139+
WorkflowOptions.newBuilder()
140+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
141+
.setTaskQueue(testWorkflowRule.getTaskQueue())
142+
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
143+
.build();
144+
145+
UpsertingMemoWorkflowInterface workflowStub =
146+
testWorkflowRule
147+
.getWorkflowClient()
148+
.newWorkflowStub(UpsertingMemoWorkflowInterface.class, options);
149+
workflowStub.execute(true);
150+
151+
WorkflowExecutionStartedEventAttributes started =
152+
getContinuedRunStartedAttributes(workflowStub);
153+
154+
Assert.assertEquals(
155+
"upsertedMemo", decodeString(started.getMemo().getFieldsOrThrow("memoKey")));
156+
}
157+
158+
@Test
159+
public void inheritsUpsertedSearchAttributesAcrossContinueAsNewWhenCommandOmitsThem() {
160+
WorkflowOptions options =
161+
WorkflowOptions.newBuilder()
162+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
163+
.setTaskQueue(testWorkflowRule.getTaskQueue())
164+
.setTypedSearchAttributes(
165+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "originalSA").build())
166+
.build();
167+
168+
UpsertingSearchAttributesWorkflowInterface workflowStub =
169+
testWorkflowRule
170+
.getWorkflowClient()
171+
.newWorkflowStub(UpsertingSearchAttributesWorkflowInterface.class, options);
172+
workflowStub.execute(true);
173+
174+
WorkflowExecutionStartedEventAttributes started =
175+
getContinuedRunStartedAttributes(workflowStub);
176+
177+
Assert.assertEquals(
178+
"upsertedSA",
179+
decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField")));
180+
}
181+
182+
@Test
183+
public void doesNotReinheritRemovedMemoAcrossContinueAsNew() {
184+
WorkflowOptions options =
185+
WorkflowOptions.newBuilder()
186+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
187+
.setTaskQueue(testWorkflowRule.getTaskQueue())
188+
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
189+
.build();
190+
191+
RemovingMemoWorkflowInterface workflowStub =
192+
testWorkflowRule
193+
.getWorkflowClient()
194+
.newWorkflowStub(RemovingMemoWorkflowInterface.class, options);
195+
workflowStub.execute(true);
196+
197+
WorkflowExecutionStartedEventAttributes started =
198+
getContinuedRunStartedAttributes(workflowStub);
199+
Assert.assertFalse(started.getMemo().containsFields("memoKey"));
200+
}
201+
202+
@Test
203+
public void overridesMemoWithEmptyMemoOnContinueAsNew() {
204+
WorkflowOptions options =
205+
WorkflowOptions.newBuilder()
206+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
207+
.setTaskQueue(testWorkflowRule.getTaskQueue())
208+
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
209+
.build();
210+
211+
ClearingMemoWorkflowInterface workflowStub =
212+
testWorkflowRule
213+
.getWorkflowClient()
214+
.newWorkflowStub(ClearingMemoWorkflowInterface.class, options);
215+
workflowStub.execute(true);
216+
217+
WorkflowExecutionStartedEventAttributes started =
218+
getContinuedRunStartedAttributes(workflowStub);
219+
Assert.assertTrue("Empty memo should be an explicit override", started.hasMemo());
220+
Assert.assertEquals(0, started.getMemo().getFieldsCount());
221+
}
222+
223+
private WorkflowExecutionStartedEventAttributes getContinuedRunStartedAttributes(
224+
Object workflowStub) {
225+
WorkflowExecution execution = WorkflowStub.fromTyped(workflowStub).getExecution();
226+
HistoryEvent firstEvent =
227+
testWorkflowRule.getExecutionHistory(execution.getWorkflowId()).getEvents().get(0);
228+
Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, firstEvent.getEventType());
229+
WorkflowExecutionStartedEventAttributes started =
230+
firstEvent.getWorkflowExecutionStartedEventAttributes();
231+
Assert.assertFalse(
232+
"Inspected event must belong to the continued run",
233+
started.getContinuedExecutionRunId().isEmpty());
234+
return started;
235+
}
236+
237+
private static String decodeString(Payload payload) {
238+
return DefaultDataConverter.STANDARD_INSTANCE.fromPayload(payload, String.class, String.class);
239+
}
240+
57241
public static class TestWorkflow implements TestWorkflows.WorkflowTakesBool {
58242
@Override
59243
public void execute(boolean doContinue) {
@@ -63,6 +247,99 @@ public void execute(boolean doContinue) {
63247
}
64248
}
65249

250+
@WorkflowInterface
251+
public interface OverridingWorkflowInterface {
252+
@WorkflowMethod
253+
void execute(boolean doContinue);
254+
}
255+
256+
public static class OverridingWorkflow implements OverridingWorkflowInterface {
257+
@Override
258+
public void execute(boolean doContinue) {
259+
if (doContinue) {
260+
Workflow.continueAsNew(
261+
ContinueAsNewOptions.newBuilder()
262+
.setMemo(ImmutableMap.of("memoKey", "overriddenMemo"))
263+
.setTypedSearchAttributes(
264+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "overriddenSA").build())
265+
.build(),
266+
false);
267+
}
268+
}
269+
}
270+
271+
@WorkflowInterface
272+
public interface UpsertingSearchAttributesWorkflowInterface {
273+
@WorkflowMethod
274+
void execute(boolean doContinue);
275+
}
276+
277+
public static class UpsertingSearchAttributesWorkflow
278+
implements UpsertingSearchAttributesWorkflowInterface {
279+
@Override
280+
public void execute(boolean doContinue) {
281+
if (doContinue) {
282+
Workflow.upsertTypedSearchAttributes(CUSTOM_KEYWORD.valueSet("upsertedSA"));
283+
// Empty typed search attributes are not encoded into the command, so this exercises
284+
// server-side inheritance after the upsert.
285+
Workflow.continueAsNew(
286+
ContinueAsNewOptions.newBuilder()
287+
.setTypedSearchAttributes(SearchAttributes.EMPTY)
288+
.build(),
289+
false);
290+
}
291+
}
292+
}
293+
294+
@WorkflowInterface
295+
public interface UpsertingMemoWorkflowInterface {
296+
@WorkflowMethod
297+
void execute(boolean doContinue);
298+
}
299+
300+
public static class UpsertingMemoWorkflow implements UpsertingMemoWorkflowInterface {
301+
@Override
302+
public void execute(boolean doContinue) {
303+
if (doContinue) {
304+
Workflow.upsertMemo(ImmutableMap.of("memoKey", "upsertedMemo"));
305+
Workflow.continueAsNew(false);
306+
}
307+
}
308+
}
309+
310+
@WorkflowInterface
311+
public interface ClearingMemoWorkflowInterface {
312+
@WorkflowMethod
313+
void execute(boolean doContinue);
314+
}
315+
316+
public static class ClearingMemoWorkflow implements ClearingMemoWorkflowInterface {
317+
@Override
318+
public void execute(boolean doContinue) {
319+
if (doContinue) {
320+
Workflow.continueAsNew(
321+
ContinueAsNewOptions.newBuilder().setMemo(Collections.emptyMap()).build(), false);
322+
}
323+
}
324+
}
325+
326+
@WorkflowInterface
327+
public interface RemovingMemoWorkflowInterface {
328+
@WorkflowMethod
329+
void execute(boolean doContinue);
330+
}
331+
332+
public static class RemovingMemoWorkflow implements RemovingMemoWorkflowInterface {
333+
@Override
334+
public void execute(boolean doContinue) {
335+
if (doContinue) {
336+
// ImmutableMap does not allow null values, and null removes the memo field.
337+
Workflow.upsertMemo(Collections.singletonMap("memoKey", null));
338+
Workflow.continueAsNew(false);
339+
}
340+
}
341+
}
342+
66343
// Verify that we can strip the TQ name and test server continues onto same TQ
67344
private static class StripsTqFromCanInterceptor extends WorkerInterceptorBase {
68345
@Override

0 commit comments

Comments
 (0)