From fe0df3c7555594ab1d49137f86c45999318d7811 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 11 Mar 2026 15:50:03 -0700 Subject: [PATCH 1/3] Introduce local timeout for activity heartbeats --- .../client/ActivityCanceledException.java | 4 + .../activity/HeartbeatContextImpl.java | 87 +++++++++ .../activity/HeartbeatContextImplTest.java | 173 ++++++++++++++++++ 3 files changed, 264 insertions(+) create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java index cf92d80ac8..17faf2a14f 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java @@ -22,6 +22,10 @@ public ActivityCanceledException(ActivityInfo info) { super(info); } + public ActivityCanceledException(ActivityInfo info, Throwable cause) { + super(info, cause); + } + public ActivityCanceledException() { super(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 0ad49f9485..59a7d18a4a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -6,9 +6,11 @@ import io.temporal.activity.ActivityExecutionContext; import io.temporal.activity.ActivityInfo; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.enums.v1.TimeoutType; import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse; import io.temporal.client.*; import io.temporal.common.converter.DataConverter; +import io.temporal.failure.TimeoutFailure; import io.temporal.internal.client.ActivityClientHelper; import io.temporal.payload.context.ActivitySerializationContext; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -28,6 +30,20 @@ class HeartbeatContextImpl implements HeartbeatContext { private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class); private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000; + // Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking. + private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000; + + static long getLocalHeartbeatTimeoutBufferMillis() { + String envVal = System.getenv("TEMPORAL_ACTIVITY_TIMEOUT_DELAY"); + if (envVal != null) { + try { + return Long.parseLong(envVal); + } catch (NumberFormatException e) { + log.warn("Invalid TEMPORAL_ACTIVITY_TIMEOUT_DELAY value: {}", envVal); + } + } + return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS; + } private final Lock lock = new ReentrantLock(); @@ -42,12 +58,15 @@ class HeartbeatContextImpl implements HeartbeatContext { private final Scope metricsScope; private final Optional prevAttemptHeartbeatDetails; + private final long heartbeatTimeoutMillis; + private final long localHeartbeatTimeoutBufferMillis; // turned into true on a reception of the first heartbeat private boolean receivedAHeartbeat = false; private Object lastDetails; private boolean hasOutstandingHeartbeat; private ScheduledFuture scheduledHeartbeat; + private ScheduledFuture heartbeatTimeoutFuture; private ActivityCompletionException lastException; @@ -61,6 +80,30 @@ public HeartbeatContextImpl( String identity, Duration maxHeartbeatThrottleInterval, Duration defaultHeartbeatThrottleInterval) { + this( + service, + namespace, + info, + dataConverter, + heartbeatExecutor, + metricsScope, + identity, + maxHeartbeatThrottleInterval, + defaultHeartbeatThrottleInterval, + getLocalHeartbeatTimeoutBufferMillis()); + } + + HeartbeatContextImpl( + WorkflowServiceStubs service, + String namespace, + ActivityInfo info, + DataConverter dataConverter, + ScheduledExecutorService heartbeatExecutor, + Scope metricsScope, + String identity, + Duration maxHeartbeatThrottleInterval, + Duration defaultHeartbeatThrottleInterval, + long localHeartbeatTimeoutBufferMillis) { this.service = service; this.metricsScope = metricsScope; this.dataConverter = dataConverter; @@ -83,6 +126,15 @@ public HeartbeatContextImpl( info.getHeartbeatTimeout(), maxHeartbeatThrottleInterval, defaultHeartbeatThrottleInterval); + this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis(); + this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis; + if (this.heartbeatTimeoutMillis > 0) { + this.heartbeatTimeoutFuture = + heartbeatExecutor.schedule( + this::onHeartbeatTimeout, + heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis, + TimeUnit.MILLISECONDS); + } } /** @@ -167,6 +219,10 @@ public void cancelOutstandingHeartbeat() { scheduledHeartbeat.cancel(false); scheduledHeartbeat = null; } + if (heartbeatTimeoutFuture != null) { + heartbeatTimeoutFuture.cancel(false); + heartbeatTimeoutFuture = null; + } hasOutstandingHeartbeat = false; } finally { lock.unlock(); @@ -179,6 +235,9 @@ private void doHeartBeatLocked(Object details) { sendHeartbeatRequest(details); hasOutstandingHeartbeat = false; nextHeartbeatDelay = heartbeatIntervalMillis; + // Reset the local heartbeat timeout timer only on successful send. + // If sends keep failing, the timer will eventually fire and cancel the activity. + resetHeartbeatTimeoutLocked(); } catch (StatusRuntimeException e) { // Not rethrowing to not fail activity implementation on intermittent connection or Temporal // errors. @@ -214,6 +273,34 @@ private void scheduleNextHeartbeatLocked(long delay) { TimeUnit.MILLISECONDS); } + private void resetHeartbeatTimeoutLocked() { + if (heartbeatTimeoutFuture != null) { + heartbeatTimeoutFuture.cancel(false); + heartbeatTimeoutFuture = + heartbeatExecutor.schedule( + this::onHeartbeatTimeout, + heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis, + TimeUnit.MILLISECONDS); + } + } + + private void onHeartbeatTimeout() { + lock.lock(); + try { + if (lastException == null) { + log.warn( + "Activity heartbeat timed out locally. ActivityId={}, activityType={}", + info.getActivityId(), + info.getActivityType()); + lastException = + new ActivityCanceledException( + info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); + } + } finally { + lock.unlock(); + } + } + private void sendHeartbeatRequest(Object details) { try { RecordActivityTaskHeartbeatResponse status = diff --git a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java new file mode 100644 index 0000000000..af71999a15 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java @@ -0,0 +1,173 @@ +package io.temporal.internal.activity; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.uber.m3.tally.NoopScope; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.activity.ActivityInfo; +import io.temporal.api.enums.v1.TimeoutType; +import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.ActivityCanceledException; +import io.temporal.client.ActivityCompletionException; +import io.temporal.common.converter.GlobalDataConverter; +import io.temporal.failure.TimeoutFailure; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.testUtils.Eventually; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class HeartbeatContextImplTest { + + private static final long TEST_BUFFER_MILLIS = 200; + + private ScheduledExecutorService heartbeatExecutor; + private WorkflowServiceStubs service; + private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub; + + @Before + public void setUp() { + heartbeatExecutor = Executors.newScheduledThreadPool(1); + service = mock(WorkflowServiceStubs.class); + blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(service.blockingStub()).thenReturn(blockingStub); + when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); + } + + @After + public void tearDown() { + heartbeatExecutor.shutdownNow(); + } + + @Test + public void heartbeatTimeoutLocallyCancelsActivity() { + Duration heartbeatTimeout = Duration.ofMillis(500); + + // All heartbeat RPCs fail with UNAVAILABLE + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + long startNanos = System.nanoTime(); + ctx.heartbeat("details-1"); + + ActivityCompletionException caught = + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("poll"); + fail("Expected ActivityCanceledException"); + return null; + } catch (ActivityCompletionException e) { + return e; + } + }); + + long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis(); + + assertSame(ActivityCanceledException.class, caught.getClass()); + assertNotNull("Expected a TimeoutFailure cause", caught.getCause()); + assertSame(TimeoutFailure.class, caught.getCause().getClass()); + assertEquals( + TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType()); + long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS; + assertTrue( + "Timeout should not fire before heartbeat timeout + buffer (" + + elapsedMs + + "ms elapsed, expected >= " + + expectedMinMs + + "ms)", + elapsedMs >= expectedMinMs); + + ctx.cancelOutstandingHeartbeat(); + } + + @Test + public void heartbeatTimeoutResetsOnSuccessfulSend() { + Duration heartbeatTimeout = Duration.ofMillis(500); + AtomicInteger callCount = new AtomicInteger(); + + // First call succeeds, then all subsequent calls fail + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenAnswer( + invocation -> { + if (callCount.getAndIncrement() == 0) { + return RecordActivityTaskHeartbeatResponse.getDefaultInstance(); + } + throw new StatusRuntimeException(Status.UNAVAILABLE); + }); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet). + // Record the time before calling — the timer reset happens during this call. + long resetNanos = System.nanoTime(); + ctx.heartbeat("details-1"); + assertEquals("First RPC should have been the successful one", 1, callCount.get()); + + // Poll until the timeout fires again (from the reset point) + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("poll"); + fail("Expected ActivityCanceledException"); + } catch (ActivityCanceledException e) { + // expected + } + }); + + long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis(); + long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS; + assertTrue( + "Timeout should not fire before heartbeat timeout + buffer from reset point (" + + elapsedSinceResetMs + + "ms elapsed since reset, expected >= " + + expectedMinMs + + "ms)", + elapsedSinceResetMs >= expectedMinMs); + + ctx.cancelOutstandingHeartbeat(); + } + + private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) { + return new HeartbeatContextImpl( + service, + "test-namespace", + info, + GlobalDataConverter.get(), + heartbeatExecutor, + new NoopScope(), + "test-identity", + Duration.ofSeconds(60), + Duration.ofSeconds(30), + TEST_BUFFER_MILLIS); + } + + private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) { + ActivityInfo info = mock(ActivityInfo.class); + when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout); + when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3}); + when(info.getWorkflowId()).thenReturn("test-workflow-id"); + when(info.getWorkflowType()).thenReturn("test-workflow-type"); + when(info.getActivityType()).thenReturn("test-activity-type"); + when(info.getActivityTaskQueue()).thenReturn("test-task-queue"); + when(info.getActivityId()).thenReturn("test-activity-id"); + when(info.isLocal()).thenReturn(false); + when(info.getHeartbeatDetails()).thenReturn(Optional.empty()); + return info; + } +} From 46486b8901326b4d11594629f7bfc52041a98b77 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 12 Mar 2026 14:03:44 -0700 Subject: [PATCH 2/3] Use Java property instead of env var --- .../temporal/internal/activity/HeartbeatContextImpl.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 59a7d18a4a..086b4d0f7b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -32,14 +32,15 @@ class HeartbeatContextImpl implements HeartbeatContext { private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000; // Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking. private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000; + static final String LOCAL_TIMEOUT_BUFFER_PROPERTY = "temporal.activity.localTimeoutBufferMs"; static long getLocalHeartbeatTimeoutBufferMillis() { - String envVal = System.getenv("TEMPORAL_ACTIVITY_TIMEOUT_DELAY"); - if (envVal != null) { + String val = System.getProperty(LOCAL_TIMEOUT_BUFFER_PROPERTY); + if (val != null) { try { - return Long.parseLong(envVal); + return Long.parseLong(val); } catch (NumberFormatException e) { - log.warn("Invalid TEMPORAL_ACTIVITY_TIMEOUT_DELAY value: {}", envVal); + log.warn("Invalid {} value: {}", LOCAL_TIMEOUT_BUFFER_PROPERTY, val); } } return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS; From a0eb92aa0e5265b9472dd2228ddb3664dd6dfc49 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 12 Mar 2026 14:16:46 -0700 Subject: [PATCH 3/3] Get rid of unnecessary future --- .../activity/HeartbeatContextImpl.java | 66 +++++++++---------- .../activity/HeartbeatContextImplTest.java | 40 +++++++++++ 2 files changed, 70 insertions(+), 36 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 086b4d0f7b..48993d0da1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -67,7 +67,11 @@ static long getLocalHeartbeatTimeoutBufferMillis() { private Object lastDetails; private boolean hasOutstandingHeartbeat; private ScheduledFuture scheduledHeartbeat; - private ScheduledFuture heartbeatTimeoutFuture; + + // Deadline (in nanos, from System.nanoTime()) by which a successful heartbeat must occur. + // 0 means no local timeout is active. + private long heartbeatTimeoutDeadlineNanos; + private boolean heartbeatTimedOut; private ActivityCompletionException lastException; @@ -130,11 +134,7 @@ public HeartbeatContextImpl( this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis(); this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis; if (this.heartbeatTimeoutMillis > 0) { - this.heartbeatTimeoutFuture = - heartbeatExecutor.schedule( - this::onHeartbeatTimeout, - heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis, - TimeUnit.MILLISECONDS); + this.heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos(); } } @@ -148,6 +148,7 @@ public void heartbeat(V details) throws ActivityCompletionException { } lock.lock(); try { + checkHeartbeatTimeoutDeadlineLocked(); receivedAHeartbeat = true; lastDetails = details; hasOutstandingHeartbeat = true; @@ -220,10 +221,7 @@ public void cancelOutstandingHeartbeat() { scheduledHeartbeat.cancel(false); scheduledHeartbeat = null; } - if (heartbeatTimeoutFuture != null) { - heartbeatTimeoutFuture.cancel(false); - heartbeatTimeoutFuture = null; - } + heartbeatTimeoutDeadlineNanos = 0; hasOutstandingHeartbeat = false; } finally { lock.unlock(); @@ -236,9 +234,12 @@ private void doHeartBeatLocked(Object details) { sendHeartbeatRequest(details); hasOutstandingHeartbeat = false; nextHeartbeatDelay = heartbeatIntervalMillis; - // Reset the local heartbeat timeout timer only on successful send. - // If sends keep failing, the timer will eventually fire and cancel the activity. - resetHeartbeatTimeoutLocked(); + // Reset the local heartbeat timeout deadline only on successful send. + // If sends keep failing, the next heartbeat() call after the deadline will cancel the + // activity. + if (heartbeatTimeoutDeadlineNanos != 0) { + heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos(); + } } catch (StatusRuntimeException e) { // Not rethrowing to not fail activity implementation on intermittent connection or Temporal // errors. @@ -274,31 +275,24 @@ private void scheduleNextHeartbeatLocked(long delay) { TimeUnit.MILLISECONDS); } - private void resetHeartbeatTimeoutLocked() { - if (heartbeatTimeoutFuture != null) { - heartbeatTimeoutFuture.cancel(false); - heartbeatTimeoutFuture = - heartbeatExecutor.schedule( - this::onHeartbeatTimeout, - heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis, - TimeUnit.MILLISECONDS); - } + private long computeHeartbeatTimeoutDeadlineNanos() { + return System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis); } - private void onHeartbeatTimeout() { - lock.lock(); - try { - if (lastException == null) { - log.warn( - "Activity heartbeat timed out locally. ActivityId={}, activityType={}", - info.getActivityId(), - info.getActivityType()); - lastException = - new ActivityCanceledException( - info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); - } - } finally { - lock.unlock(); + private void checkHeartbeatTimeoutDeadlineLocked() { + if (heartbeatTimedOut) { + throw new ActivityCanceledException( + info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); + } + if (heartbeatTimeoutDeadlineNanos != 0 && System.nanoTime() >= heartbeatTimeoutDeadlineNanos) { + heartbeatTimedOut = true; + log.warn( + "Activity heartbeat timed out locally. ActivityId={}, activityType={}", + info.getActivityId(), + info.getActivityType()); + throw new ActivityCanceledException( + info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java index af71999a15..686bc566f8 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java @@ -143,6 +143,46 @@ public void heartbeatTimeoutResetsOnSuccessfulSend() { ctx.cancelOutstandingHeartbeat(); } + @Test + public void heartbeatTimeoutPersistsAcrossMultipleCalls() { + Duration heartbeatTimeout = Duration.ofMillis(500); + + // All heartbeat RPCs fail with UNAVAILABLE + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + ctx.heartbeat("details-1"); + + // Wait for timeout to fire + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("poll"); + fail("Expected ActivityCanceledException"); + } catch (ActivityCanceledException e) { + // expected + } + }); + + // Subsequent calls should continue to throw + for (int i = 0; i < 5; i++) { + try { + ctx.heartbeat("details-" + i); + fail("Expected ActivityCanceledException on call " + i); + } catch (ActivityCompletionException e) { + assertSame(ActivityCanceledException.class, e.getClass()); + assertNotNull(e.getCause()); + assertSame(TimeoutFailure.class, e.getCause().getClass()); + } + } + + ctx.cancelOutstandingHeartbeat(); + } + private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) { return new HeartbeatContextImpl( service,