diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java b/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java index bb8ad8912..cbfce7e43 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java @@ -18,6 +18,11 @@ public enum SdkFlag { * Changes behavior of CancellationScope to cancel children in a deterministic order. */ DETERMINISTIC_CANCELLATION_SCOPE_ORDER(3), + /* + * Changes behavior of Workflow.await(duration, condition) to cancel the timer if the + * condition is resolved before the timeout. + */ + CANCEL_AWAIT_TIMER_ON_CONDITION(4), UNKNOWN(Integer.MAX_VALUE); private final int value; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 977d9754e..010458b64 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1317,9 +1317,35 @@ public void sleep(Duration duration) { @Override public boolean await(Duration timeout, String reason, Supplier unblockCondition) { - Promise timer = newTimer(timeout); - WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get())); - return !timer.isCompleted(); + // TODO: Change checkSdkFlag to tryUseSdkFlag in the next release to enable this flag by + // default. + boolean cancelTimerOnCondition = + replayContext.checkSdkFlag(SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION); + + if (cancelTimerOnCondition) { + // If condition is already satisfied, skip creating timer + if (unblockCondition.get()) { + return true; + } + // Create timer in a cancellation scope so we can cancel it when condition is satisfied + CompletablePromise timer = Workflow.newPromise(); + CancellationScope timerScope = + Workflow.newCancellationScope(() -> timer.completeFrom(newTimer(timeout))); + timerScope.run(); + + WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get())); + + boolean conditionSatisfied = !timer.isCompleted(); + if (conditionSatisfied) { + timerScope.cancel("await condition resolved"); + } + return conditionSatisfied; + } else { + // Old behavior: timer is not cancelled when condition is satisfied + Promise timer = newTimer(timeout); + WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get())); + return !timer.isCompleted(); + } } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/cancellationTests/WorkflowAwaitCancelTimerOnConditionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/cancellationTests/WorkflowAwaitCancelTimerOnConditionTest.java new file mode 100644 index 000000000..2f5f2caab --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/cancellationTests/WorkflowAwaitCancelTimerOnConditionTest.java @@ -0,0 +1,228 @@ +package io.temporal.workflow.cancellationTests; + +import static org.junit.Assert.*; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.internal.common.SdkFlag; +import io.temporal.internal.statemachines.WorkflowStateMachines; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for the CANCEL_AWAIT_TIMER_ON_CONDITION SDK flag behavior. Tests verify both old and new + * behavior by explicitly switching the SDK flag, following the Go SDK pattern. + * + *

Since the flag is NOT auto-enabled (uses checkSdkFlag, not tryUseSdkFlag), tests must + * explicitly add it to initialFlags to enable the new behavior. + */ +public class WorkflowAwaitCancelTimerOnConditionTest { + + private List savedInitialFlags; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestAwaitCancelTimerWorkflowImpl.class, + TestImmediateConditionWorkflowImpl.class, + TestReturnValueWorkflowImpl.class) + .build(); + + @Before + public void setUp() { + savedInitialFlags = WorkflowStateMachines.initialFlags; + } + + @After + public void tearDown() { + WorkflowStateMachines.initialFlags = savedInitialFlags; + } + + /** + * Tests that the timer IS cancelled when the flag is explicitly enabled. With + * CANCEL_AWAIT_TIMER_ON_CONDITION in initialFlags, we expect TIMER_CANCELED in history. + */ + @Test + public void testTimerCancelledWhenFlagEnabled() { + WorkflowStateMachines.initialFlags = + Collections.unmodifiableList( + Arrays.asList( + SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION)); + + TestAwaitWorkflow workflow = testWorkflowRule.newWorkflowStub(TestAwaitWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + testWorkflowRule.sleep(Duration.ofMillis(500)); + workflow.unblock(); + + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + String result = untyped.getResult(String.class); + assertEquals("condition satisfied", result); + + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_CANCELED); + } + + /** + * Tests that the timer is NOT cancelled when the flag is disabled (default). Without the flag in + * initialFlags, the old behavior is used: timer runs even after condition is satisfied. + */ + @Test + public void testTimerNotCancelledWhenFlagDisabled() { + // Default initialFlags do NOT include CANCEL_AWAIT_TIMER_ON_CONDITION + TestAwaitWorkflow workflow = testWorkflowRule.newWorkflowStub(TestAwaitWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + testWorkflowRule.sleep(Duration.ofMillis(500)); + workflow.unblock(); + + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + String result = untyped.getResult(String.class); + assertEquals("condition satisfied", result); + + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED); + testWorkflowRule.assertNoHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_CANCELED); + } + + /** + * Tests that no timer is created when the condition is immediately true and the flag is enabled. + */ + @Test + public void testNoTimerWhenConditionImmediatelySatisfiedWithFlag() { + WorkflowStateMachines.initialFlags = + Collections.unmodifiableList( + Arrays.asList( + SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION)); + + TestImmediateConditionWorkflow workflow = + testWorkflowRule.newWorkflowStub(TestImmediateConditionWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + String result = untyped.getResult(String.class); + assertEquals("immediate condition", result); + + testWorkflowRule.assertNoHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED); + } + + /** + * Tests that the await returns true when condition is satisfied and false when it times out. This + * verifies the return value semantics are preserved with the new flag. + */ + @Test + public void testAwaitReturnValue() { + WorkflowStateMachines.initialFlags = + Collections.unmodifiableList( + Arrays.asList( + SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION)); + + TestReturnValueWorkflow workflow = + testWorkflowRule.newWorkflowStub(TestReturnValueWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + testWorkflowRule.sleep(Duration.ofMillis(500)); + workflow.unblock(); + + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + String result = untyped.getResult(String.class); + assertEquals("conditionSatisfied=true,timedOut=true", result); + + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_CANCELED); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_FIRED); + } + + /** + * Tests replay compatibility with old workflow histories that were recorded WITHOUT the + * CANCEL_AWAIT_TIMER_ON_CONDITION flag. + */ + @Test + public void testReplayOldHistoryWithoutFlag() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "awaitTimerConditionOldBehavior.json", TestAwaitCancelTimerWorkflowImpl.class); + } + + @WorkflowInterface + public interface TestAwaitWorkflow { + @WorkflowMethod + String execute(); + + @SignalMethod + void unblock(); + } + + @WorkflowInterface + public interface TestImmediateConditionWorkflow { + @WorkflowMethod + String execute(); + } + + @WorkflowInterface + public interface TestReturnValueWorkflow { + @WorkflowMethod + String execute(); + + @SignalMethod + void unblock(); + } + + public static class TestAwaitCancelTimerWorkflowImpl implements TestAwaitWorkflow { + private boolean unblocked = false; + + @Override + public String execute() { + boolean result = Workflow.await(Duration.ofHours(1), () -> unblocked); + return result ? "condition satisfied" : "timed out"; + } + + @Override + public void unblock() { + unblocked = true; + } + } + + public static class TestImmediateConditionWorkflowImpl implements TestImmediateConditionWorkflow { + @Override + public String execute() { + boolean result = Workflow.await(Duration.ofHours(1), () -> true); + return result ? "immediate condition" : "unexpected timeout"; + } + } + + public static class TestReturnValueWorkflowImpl implements TestReturnValueWorkflow { + private boolean unblocked = false; + + @Override + public String execute() { + boolean conditionSatisfied = Workflow.await(Duration.ofHours(1), () -> unblocked); + boolean timedOut = !Workflow.await(Duration.ofMillis(100), () -> false); + return "conditionSatisfied=" + conditionSatisfied + ",timedOut=" + timedOut; + } + + @Override + public void unblock() { + unblocked = true; + } + } +} diff --git a/temporal-sdk/src/test/resources/awaitTimerConditionOldBehavior.json b/temporal-sdk/src/test/resources/awaitTimerConditionOldBehavior.json new file mode 100644 index 000000000..f82728192 --- /dev/null +++ b/temporal-sdk/src/test/resources/awaitTimerConditionOldBehavior.json @@ -0,0 +1,138 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2026-01-07T01:45:58.381Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "TestAwaitWorkflow" + }, + "taskQueue": { + "name": "WorkflowTest-replay-await-timer" + }, + "input": {}, + "workflowExecutionTimeout": "315360000s", + "workflowRunTimeout": "315360000s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "08ddbb4c-d763-4a85-af00-e841226b7f73", + "identity": "test-worker", + "firstExecutionRunId": "08ddbb4c-d763-4a85-af00-e841226b7f73", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {} + } + }, + { + "eventId": "2", + "eventTime": "2026-01-07T01:45:58.381Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "WorkflowTest-replay-await-timer" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2026-01-07T01:45:58.391Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "test-worker" + } + }, + { + "eventId": "4", + "eventTime": "2026-01-07T01:45:58.495Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "identity": "test-worker", + "sdkMetadata": { + "langUsedFlags": [ + 1 + ], + "sdkName": "temporal-java", + "sdkVersion": "1.20.0" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2026-01-07T01:45:58.495Z", + "eventType": "EVENT_TYPE_TIMER_STARTED", + "timerStartedEventAttributes": { + "timerId": "f5baace7-0ef1-310f-93eb-30caf39e6f4a", + "startToFireTimeout": "3600s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "6", + "eventTime": "2026-01-07T01:45:58.896Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "workflowExecutionSignaledEventAttributes": { + "signalName": "unblock", + "input": {}, + "identity": "test-client" + } + }, + { + "eventId": "7", + "eventTime": "2026-01-07T01:45:58.896Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "WorkflowTest-replay-await-timer" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "8", + "eventTime": "2026-01-07T01:45:58.896Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "test-worker" + } + }, + { + "eventId": "9", + "eventTime": "2026-01-07T01:45:58.912Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "7", + "identity": "test-worker", + "sdkMetadata": { + "sdkName": "temporal-java", + "sdkVersion": "1.20.0" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "10", + "eventTime": "2026-01-07T01:45:58.912Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImNvbmRpdGlvbiBzYXRpc2ZpZWQi" + } + ] + }, + "workflowTaskCompletedEventId": "9" + } + } + ] +}