-
Notifications
You must be signed in to change notification settings - Fork 212
Introduce local timeout for activity heartbeats #2804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
fe0df3c
8106ebd
46486b8
a0eb92a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,9 +6,11 @@ | |
| import io.temporal.activity.ActivityExecutionContext; | ||
| import io.temporal.activity.ActivityInfo; | ||
| import io.temporal.api.common.v1.Payloads; | ||
| import io.temporal.api.enums.v1.TimeoutType; | ||
| import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse; | ||
| import io.temporal.client.*; | ||
| import io.temporal.common.converter.DataConverter; | ||
| import io.temporal.failure.TimeoutFailure; | ||
| import io.temporal.internal.client.ActivityClientHelper; | ||
| import io.temporal.payload.context.ActivitySerializationContext; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
|
|
@@ -28,6 +30,20 @@ | |
| class HeartbeatContextImpl implements HeartbeatContext { | ||
| private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class); | ||
| private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000; | ||
| // Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking. | ||
| private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000; | ||
|
|
||
| static long getLocalHeartbeatTimeoutBufferMillis() { | ||
| String envVal = System.getenv("TEMPORAL_ACTIVITY_TIMEOUT_DELAY"); | ||
| if (envVal != null) { | ||
| try { | ||
| return Long.parseLong(envVal); | ||
| } catch (NumberFormatException e) { | ||
| log.warn("Invalid TEMPORAL_ACTIVITY_TIMEOUT_DELAY value: {}", envVal); | ||
| } | ||
| } | ||
| return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS; | ||
| } | ||
|
|
||
| private final Lock lock = new ReentrantLock(); | ||
|
|
||
|
|
@@ -42,12 +58,15 @@ class HeartbeatContextImpl implements HeartbeatContext { | |
|
|
||
| private final Scope metricsScope; | ||
| private final Optional<Payloads> prevAttemptHeartbeatDetails; | ||
| private final long heartbeatTimeoutMillis; | ||
| private final long localHeartbeatTimeoutBufferMillis; | ||
|
|
||
| // turned into true on a reception of the first heartbeat | ||
| private boolean receivedAHeartbeat = false; | ||
| private Object lastDetails; | ||
| private boolean hasOutstandingHeartbeat; | ||
| private ScheduledFuture<?> scheduledHeartbeat; | ||
| private ScheduledFuture<?> heartbeatTimeoutFuture; | ||
|
|
||
| private ActivityCompletionException lastException; | ||
|
|
||
|
|
@@ -61,6 +80,30 @@ public HeartbeatContextImpl( | |
| String identity, | ||
| Duration maxHeartbeatThrottleInterval, | ||
| Duration defaultHeartbeatThrottleInterval) { | ||
| this( | ||
| service, | ||
| namespace, | ||
| info, | ||
| dataConverter, | ||
| heartbeatExecutor, | ||
| metricsScope, | ||
| identity, | ||
| maxHeartbeatThrottleInterval, | ||
| defaultHeartbeatThrottleInterval, | ||
| getLocalHeartbeatTimeoutBufferMillis()); | ||
| } | ||
|
|
||
| HeartbeatContextImpl( | ||
| WorkflowServiceStubs service, | ||
| String namespace, | ||
| ActivityInfo info, | ||
| DataConverter dataConverter, | ||
| ScheduledExecutorService heartbeatExecutor, | ||
| Scope metricsScope, | ||
| String identity, | ||
| Duration maxHeartbeatThrottleInterval, | ||
| Duration defaultHeartbeatThrottleInterval, | ||
| long localHeartbeatTimeoutBufferMillis) { | ||
| this.service = service; | ||
| this.metricsScope = metricsScope; | ||
| this.dataConverter = dataConverter; | ||
|
|
@@ -83,6 +126,15 @@ public HeartbeatContextImpl( | |
| info.getHeartbeatTimeout(), | ||
| maxHeartbeatThrottleInterval, | ||
| defaultHeartbeatThrottleInterval); | ||
| this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis(); | ||
| this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis; | ||
| if (this.heartbeatTimeoutMillis > 0) { | ||
| this.heartbeatTimeoutFuture = | ||
| heartbeatExecutor.schedule( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason you did and executor and a future instead of just keeping track of the last time a heartbeat succeeded and throwing an exception if it exceeded the timeout?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throw from where? The heartbeating method itself? Good point. I did this because I was thinking we'd proactively cancel the activity a-la what Core does, but, that turns out not to work here without more refactoring. Can certainly change it to not bother w/ a future, or could do the more elaborate thing. Former is probably sufficient and is easier.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| this::onHeartbeatTimeout, | ||
| heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis, | ||
| TimeUnit.MILLISECONDS); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -167,6 +219,10 @@ public void cancelOutstandingHeartbeat() { | |
| scheduledHeartbeat.cancel(false); | ||
| scheduledHeartbeat = null; | ||
| } | ||
| if (heartbeatTimeoutFuture != null) { | ||
| heartbeatTimeoutFuture.cancel(false); | ||
| heartbeatTimeoutFuture = null; | ||
| } | ||
| hasOutstandingHeartbeat = false; | ||
| } finally { | ||
| lock.unlock(); | ||
|
|
@@ -179,6 +235,9 @@ private void doHeartBeatLocked(Object details) { | |
| sendHeartbeatRequest(details); | ||
| hasOutstandingHeartbeat = false; | ||
| nextHeartbeatDelay = heartbeatIntervalMillis; | ||
| // Reset the local heartbeat timeout timer only on successful send. | ||
| // If sends keep failing, the timer will eventually fire and cancel the activity. | ||
| resetHeartbeatTimeoutLocked(); | ||
| } catch (StatusRuntimeException e) { | ||
| // Not rethrowing to not fail activity implementation on intermittent connection or Temporal | ||
| // errors. | ||
|
|
@@ -214,6 +273,34 @@ private void scheduleNextHeartbeatLocked(long delay) { | |
| TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| private void resetHeartbeatTimeoutLocked() { | ||
| if (heartbeatTimeoutFuture != null) { | ||
| heartbeatTimeoutFuture.cancel(false); | ||
| heartbeatTimeoutFuture = | ||
| heartbeatExecutor.schedule( | ||
| this::onHeartbeatTimeout, | ||
| heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis, | ||
| TimeUnit.MILLISECONDS); | ||
| } | ||
| } | ||
|
|
||
| private void onHeartbeatTimeout() { | ||
| lock.lock(); | ||
| try { | ||
| if (lastException == null) { | ||
| log.warn( | ||
| "Activity heartbeat timed out locally. ActivityId={}, activityType={}", | ||
| info.getActivityId(), | ||
| info.getActivityType()); | ||
| lastException = | ||
| new ActivityCanceledException( | ||
| info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT)); | ||
| } | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| private void sendHeartbeatRequest(Object details) { | ||
| try { | ||
| RecordActivityTaskHeartbeatResponse status = | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| package io.temporal.internal.activity; | ||
|
|
||
| import static org.junit.Assert.*; | ||
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.Mockito.*; | ||
|
|
||
| import com.uber.m3.tally.NoopScope; | ||
| import io.grpc.Status; | ||
| import io.grpc.StatusRuntimeException; | ||
| import io.temporal.activity.ActivityInfo; | ||
| import io.temporal.api.enums.v1.TimeoutType; | ||
| import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse; | ||
| import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; | ||
| import io.temporal.client.ActivityCanceledException; | ||
| import io.temporal.client.ActivityCompletionException; | ||
| import io.temporal.common.converter.GlobalDataConverter; | ||
| import io.temporal.failure.TimeoutFailure; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import io.temporal.testUtils.Eventually; | ||
| import java.time.Duration; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import org.junit.After; | ||
| import org.junit.Before; | ||
| import org.junit.Test; | ||
|
|
||
| public class HeartbeatContextImplTest { | ||
|
|
||
| private static final long TEST_BUFFER_MILLIS = 200; | ||
|
|
||
| private ScheduledExecutorService heartbeatExecutor; | ||
| private WorkflowServiceStubs service; | ||
| private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub; | ||
|
|
||
| @Before | ||
| public void setUp() { | ||
| heartbeatExecutor = Executors.newScheduledThreadPool(1); | ||
| service = mock(WorkflowServiceStubs.class); | ||
| blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); | ||
| when(service.blockingStub()).thenReturn(blockingStub); | ||
| when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); | ||
| } | ||
|
|
||
| @After | ||
| public void tearDown() { | ||
| heartbeatExecutor.shutdownNow(); | ||
| } | ||
|
|
||
| @Test | ||
| public void heartbeatTimeoutLocallyCancelsActivity() { | ||
| Duration heartbeatTimeout = Duration.ofMillis(500); | ||
|
|
||
| // All heartbeat RPCs fail with UNAVAILABLE | ||
| when(blockingStub.recordActivityTaskHeartbeat(any())) | ||
| .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); | ||
|
|
||
| ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); | ||
| HeartbeatContextImpl ctx = createHeartbeatContext(info); | ||
|
|
||
| long startNanos = System.nanoTime(); | ||
| ctx.heartbeat("details-1"); | ||
|
|
||
| ActivityCompletionException caught = | ||
| Eventually.assertEventually( | ||
| Duration.ofSeconds(10), | ||
| () -> { | ||
| try { | ||
| ctx.heartbeat("poll"); | ||
| fail("Expected ActivityCanceledException"); | ||
| return null; | ||
| } catch (ActivityCompletionException e) { | ||
| return e; | ||
| } | ||
| }); | ||
|
|
||
| long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis(); | ||
|
|
||
| assertSame(ActivityCanceledException.class, caught.getClass()); | ||
| assertNotNull("Expected a TimeoutFailure cause", caught.getCause()); | ||
| assertSame(TimeoutFailure.class, caught.getCause().getClass()); | ||
| assertEquals( | ||
| TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType()); | ||
| long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS; | ||
| assertTrue( | ||
| "Timeout should not fire before heartbeat timeout + buffer (" | ||
| + elapsedMs | ||
| + "ms elapsed, expected >= " | ||
| + expectedMinMs | ||
| + "ms)", | ||
| elapsedMs >= expectedMinMs); | ||
|
|
||
| ctx.cancelOutstandingHeartbeat(); | ||
| } | ||
|
|
||
| @Test | ||
| public void heartbeatTimeoutResetsOnSuccessfulSend() { | ||
| Duration heartbeatTimeout = Duration.ofMillis(500); | ||
| AtomicInteger callCount = new AtomicInteger(); | ||
|
|
||
| // First call succeeds, then all subsequent calls fail | ||
| when(blockingStub.recordActivityTaskHeartbeat(any())) | ||
| .thenAnswer( | ||
| invocation -> { | ||
| if (callCount.getAndIncrement() == 0) { | ||
| return RecordActivityTaskHeartbeatResponse.getDefaultInstance(); | ||
| } | ||
| throw new StatusRuntimeException(Status.UNAVAILABLE); | ||
| }); | ||
|
|
||
| ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout); | ||
| HeartbeatContextImpl ctx = createHeartbeatContext(info); | ||
|
|
||
| // The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet). | ||
| // Record the time before calling — the timer reset happens during this call. | ||
| long resetNanos = System.nanoTime(); | ||
| ctx.heartbeat("details-1"); | ||
| assertEquals("First RPC should have been the successful one", 1, callCount.get()); | ||
|
|
||
| // Poll until the timeout fires again (from the reset point) | ||
| Eventually.assertEventually( | ||
| Duration.ofSeconds(10), | ||
| () -> { | ||
| try { | ||
| ctx.heartbeat("poll"); | ||
| fail("Expected ActivityCanceledException"); | ||
| } catch (ActivityCanceledException e) { | ||
| // expected | ||
| } | ||
| }); | ||
|
|
||
| long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis(); | ||
| long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS; | ||
| assertTrue( | ||
| "Timeout should not fire before heartbeat timeout + buffer from reset point (" | ||
| + elapsedSinceResetMs | ||
| + "ms elapsed since reset, expected >= " | ||
| + expectedMinMs | ||
| + "ms)", | ||
| elapsedSinceResetMs >= expectedMinMs); | ||
|
|
||
| ctx.cancelOutstandingHeartbeat(); | ||
| } | ||
|
|
||
| private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) { | ||
| return new HeartbeatContextImpl( | ||
| service, | ||
| "test-namespace", | ||
| info, | ||
| GlobalDataConverter.get(), | ||
| heartbeatExecutor, | ||
| new NoopScope(), | ||
| "test-identity", | ||
| Duration.ofSeconds(60), | ||
| Duration.ofSeconds(30), | ||
| TEST_BUFFER_MILLIS); | ||
| } | ||
|
|
||
| private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) { | ||
| ActivityInfo info = mock(ActivityInfo.class); | ||
| when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout); | ||
| when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3}); | ||
| when(info.getWorkflowId()).thenReturn("test-workflow-id"); | ||
| when(info.getWorkflowType()).thenReturn("test-workflow-type"); | ||
| when(info.getActivityType()).thenReturn("test-activity-type"); | ||
| when(info.getActivityTaskQueue()).thenReturn("test-task-queue"); | ||
| when(info.getActivityId()).thenReturn("test-activity-id"); | ||
| when(info.isLocal()).thenReturn(false); | ||
| when(info.getHeartbeatDetails()).thenReturn(Optional.empty()); | ||
| return info; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this b/c I didn't want a worker option for a setting I expect basically no one will ever care about adjusting. If we feel otherwise, can promote.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use a Java property here, what we did for some other debug configs, but this does work as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's a good call let's do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh does no test use this setting? I thought you needed this to make tests faster. If no test needs it I would just make it a constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to keep this option just in case the user who's been asking for feels they need a tighter bound.