diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java index cf92d80ac..17faf2a14 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java @@ -22,6 +22,10 @@ public ActivityCanceledException(ActivityInfo info) { super(info); } + public ActivityCanceledException(ActivityInfo info, Throwable cause) { + super(info, cause); + } + public ActivityCanceledException() { super(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 0ad49f948..48993d0da 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -6,9 +6,11 @@ import io.temporal.activity.ActivityExecutionContext; import io.temporal.activity.ActivityInfo; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.enums.v1.TimeoutType; import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse; import io.temporal.client.*; import io.temporal.common.converter.DataConverter; +import io.temporal.failure.TimeoutFailure; import io.temporal.internal.client.ActivityClientHelper; import io.temporal.payload.context.ActivitySerializationContext; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -28,6 +30,21 @@ class HeartbeatContextImpl implements HeartbeatContext { private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class); private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000; + // Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking. + private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000; + static final String LOCAL_TIMEOUT_BUFFER_PROPERTY = "temporal.activity.localTimeoutBufferMs"; + + static long getLocalHeartbeatTimeoutBufferMillis() { + String val = System.getProperty(LOCAL_TIMEOUT_BUFFER_PROPERTY); + if (val != null) { + try { + return Long.parseLong(val); + } catch (NumberFormatException e) { + log.warn("Invalid {} value: {}", LOCAL_TIMEOUT_BUFFER_PROPERTY, val); + } + } + return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS; + } private final Lock lock = new ReentrantLock(); @@ -42,6 +59,8 @@ class HeartbeatContextImpl implements HeartbeatContext { private final Scope metricsScope; private final Optional prevAttemptHeartbeatDetails; + private final long heartbeatTimeoutMillis; + private final long localHeartbeatTimeoutBufferMillis; // turned into true on a reception of the first heartbeat private boolean receivedAHeartbeat = false; @@ -49,6 +68,11 @@ class HeartbeatContextImpl implements HeartbeatContext { private boolean hasOutstandingHeartbeat; private ScheduledFuture scheduledHeartbeat; + // Deadline (in nanos, from System.nanoTime()) by which a successful heartbeat must occur. + // 0 means no local timeout is active. + private long heartbeatTimeoutDeadlineNanos; + private boolean heartbeatTimedOut; + private ActivityCompletionException lastException; public HeartbeatContextImpl( @@ -61,6 +85,30 @@ public HeartbeatContextImpl( String identity, Duration maxHeartbeatThrottleInterval, Duration defaultHeartbeatThrottleInterval) { + this( + service, + namespace, + info, + dataConverter, + heartbeatExecutor, + metricsScope, + identity, + maxHeartbeatThrottleInterval, + defaultHeartbeatThrottleInterval, + getLocalHeartbeatTimeoutBufferMillis()); + } + + HeartbeatContextImpl( + WorkflowServiceStubs service, + String namespace, + ActivityInfo info, + DataConverter dataConverter, + ScheduledExecutorService heartbeatExecutor, + Scope metricsScope, + String identity, + Duration maxHeartbeatThrottleInterval, + Duration defaultHeartbeatThrottleInterval, + long localHeartbeatTimeoutBufferMillis) { this.service = service; this.metricsScope = metricsScope; this.dataConverter = dataConverter; @@ -83,6 +131,11 @@ public HeartbeatContextImpl( info.getHeartbeatTimeout(), maxHeartbeatThrottleInterval, defaultHeartbeatThrottleInterval); + this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis(); + this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis; + if (this.heartbeatTimeoutMillis > 0) { + this.heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos(); + } } /** @@ -95,6 +148,7 @@ public void heartbeat(V details) throws ActivityCompletionException { } lock.lock(); try { + checkHeartbeatTimeoutDeadlineLocked(); receivedAHeartbeat = true; lastDetails = details; hasOutstandingHeartbeat = true; @@ -167,6 +221,7 @@ public void cancelOutstandingHeartbeat() { scheduledHeartbeat.cancel(false); scheduledHeartbeat = null; } + heartbeatTimeoutDeadlineNanos = 0; hasOutstandingHeartbeat = false; } finally { lock.unlock(); @@ -179,6 +234,12 @@ private void doHeartBeatLocked(Object details) { sendHeartbeatRequest(details); hasOutstandingHeartbeat = false; nextHeartbeatDelay = heartbeatIntervalMillis; + // Reset the local heartbeat timeout deadline only on successful send. + // If sends keep failing, the next heartbeat() call after the deadline will cancel the + // activity. + if (heartbeatTimeoutDeadlineNanos != 0) { + heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos(); + } } catch (StatusRuntimeException e) { // Not rethrowing to not fail activity implementation on intermittent connection or Temporal // errors. @@ -214,6 +275,27 @@ private void scheduleNextHeartbeatLocked(long delay) { TimeUnit.MILLISECONDS); } + private long computeHeartbeatTimeoutDeadlineNanos() { + return System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis); + } + + private void checkHeartbeatTimeoutDeadlineLocked() { + if (heartbeatTimedOut) { + throw new ActivityCanceledException( + info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); + } + if (heartbeatTimeoutDeadlineNanos != 0 && System.nanoTime() >= heartbeatTimeoutDeadlineNanos) { + heartbeatTimedOut = true; + log.warn( + "Activity heartbeat timed out locally. ActivityId={}, activityType={}", + info.getActivityId(), + info.getActivityType()); + throw new ActivityCanceledException( + info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); + } + } + private void sendHeartbeatRequest(Object details) { try { RecordActivityTaskHeartbeatResponse status = diff --git a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java new file mode 100644 index 000000000..686bc566f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java @@ -0,0 +1,213 @@ +package io.temporal.internal.activity; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.uber.m3.tally.NoopScope; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.activity.ActivityInfo; +import io.temporal.api.enums.v1.TimeoutType; +import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.ActivityCanceledException; +import io.temporal.client.ActivityCompletionException; +import io.temporal.common.converter.GlobalDataConverter; +import io.temporal.failure.TimeoutFailure; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.testUtils.Eventually; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class HeartbeatContextImplTest { + + private static final long TEST_BUFFER_MILLIS = 200; + + private ScheduledExecutorService heartbeatExecutor; + private WorkflowServiceStubs service; + private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub; + + @Before + public void setUp() { + heartbeatExecutor = Executors.newScheduledThreadPool(1); + service = mock(WorkflowServiceStubs.class); + blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(service.blockingStub()).thenReturn(blockingStub); + when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); + } + + @After + public void tearDown() { + heartbeatExecutor.shutdownNow(); + } + + @Test + public void heartbeatTimeoutLocallyCancelsActivity() { + Duration heartbeatTimeout = Duration.ofMillis(500); + + // All heartbeat RPCs fail with UNAVAILABLE + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + long startNanos = System.nanoTime(); + ctx.heartbeat("details-1"); + + ActivityCompletionException caught = + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("poll"); + fail("Expected ActivityCanceledException"); + return null; + } catch (ActivityCompletionException e) { + return e; + } + }); + + long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis(); + + assertSame(ActivityCanceledException.class, caught.getClass()); + assertNotNull("Expected a TimeoutFailure cause", caught.getCause()); + assertSame(TimeoutFailure.class, caught.getCause().getClass()); + assertEquals( + TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType()); + long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS; + assertTrue( + "Timeout should not fire before heartbeat timeout + buffer (" + + elapsedMs + + "ms elapsed, expected >= " + + expectedMinMs + + "ms)", + elapsedMs >= expectedMinMs); + + ctx.cancelOutstandingHeartbeat(); + } + + @Test + public void heartbeatTimeoutResetsOnSuccessfulSend() { + Duration heartbeatTimeout = Duration.ofMillis(500); + AtomicInteger callCount = new AtomicInteger(); + + // First call succeeds, then all subsequent calls fail + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenAnswer( + invocation -> { + if (callCount.getAndIncrement() == 0) { + return RecordActivityTaskHeartbeatResponse.getDefaultInstance(); + } + throw new StatusRuntimeException(Status.UNAVAILABLE); + }); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet). + // Record the time before calling — the timer reset happens during this call. + long resetNanos = System.nanoTime(); + ctx.heartbeat("details-1"); + assertEquals("First RPC should have been the successful one", 1, callCount.get()); + + // Poll until the timeout fires again (from the reset point) + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("poll"); + fail("Expected ActivityCanceledException"); + } catch (ActivityCanceledException e) { + // expected + } + }); + + long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis(); + long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS; + assertTrue( + "Timeout should not fire before heartbeat timeout + buffer from reset point (" + + elapsedSinceResetMs + + "ms elapsed since reset, expected >= " + + expectedMinMs + + "ms)", + elapsedSinceResetMs >= expectedMinMs); + + ctx.cancelOutstandingHeartbeat(); + } + + @Test + public void heartbeatTimeoutPersistsAcrossMultipleCalls() { + Duration heartbeatTimeout = Duration.ofMillis(500); + + // All heartbeat RPCs fail with UNAVAILABLE + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + ctx.heartbeat("details-1"); + + // Wait for timeout to fire + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("poll"); + fail("Expected ActivityCanceledException"); + } catch (ActivityCanceledException e) { + // expected + } + }); + + // Subsequent calls should continue to throw + for (int i = 0; i < 5; i++) { + try { + ctx.heartbeat("details-" + i); + fail("Expected ActivityCanceledException on call " + i); + } catch (ActivityCompletionException e) { + assertSame(ActivityCanceledException.class, e.getClass()); + assertNotNull(e.getCause()); + assertSame(TimeoutFailure.class, e.getCause().getClass()); + } + } + + ctx.cancelOutstandingHeartbeat(); + } + + private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) { + return new HeartbeatContextImpl( + service, + "test-namespace", + info, + GlobalDataConverter.get(), + heartbeatExecutor, + new NoopScope(), + "test-identity", + Duration.ofSeconds(60), + Duration.ofSeconds(30), + TEST_BUFFER_MILLIS); + } + + private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) { + ActivityInfo info = mock(ActivityInfo.class); + when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout); + when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3}); + when(info.getWorkflowId()).thenReturn("test-workflow-id"); + when(info.getWorkflowType()).thenReturn("test-workflow-type"); + when(info.getActivityType()).thenReturn("test-activity-type"); + when(info.getActivityTaskQueue()).thenReturn("test-task-queue"); + when(info.getActivityId()).thenReturn("test-activity-id"); + when(info.isLocal()).thenReturn(false); + when(info.getHeartbeatDetails()).thenReturn(Optional.empty()); + return info; + } +}