Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public ActivityCanceledException(ActivityInfo info) {
super(info);
}

public ActivityCanceledException(ActivityInfo info, Throwable cause) {
super(info, cause);
}

public ActivityCanceledException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInfo;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
import io.temporal.client.*;
import io.temporal.common.converter.DataConverter;
import io.temporal.failure.TimeoutFailure;
import io.temporal.internal.client.ActivityClientHelper;
import io.temporal.payload.context.ActivitySerializationContext;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand All @@ -28,6 +30,20 @@
class HeartbeatContextImpl implements HeartbeatContext {
private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class);
private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000;
// Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking.
private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000;

static long getLocalHeartbeatTimeoutBufferMillis() {
String envVal = System.getenv("TEMPORAL_ACTIVITY_TIMEOUT_DELAY");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this b/c I didn't want a worker option for a setting I expect basically no one will ever care about adjusting. If we feel otherwise, can promote.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use a Java property here, what we did for some other debug configs, but this does work as well

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's a good call let's do that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh does no test use this setting? I thought you needed this to make tests faster. If no test needs it I would just make it a constant.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep this option just in case the user who's been asking for feels they need a tighter bound.

if (envVal != null) {
try {
return Long.parseLong(envVal);
} catch (NumberFormatException e) {
log.warn("Invalid TEMPORAL_ACTIVITY_TIMEOUT_DELAY value: {}", envVal);
}
}
return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS;
}

private final Lock lock = new ReentrantLock();

Expand All @@ -42,12 +58,15 @@ class HeartbeatContextImpl implements HeartbeatContext {

private final Scope metricsScope;
private final Optional<Payloads> prevAttemptHeartbeatDetails;
private final long heartbeatTimeoutMillis;
private final long localHeartbeatTimeoutBufferMillis;

// turned into true on a reception of the first heartbeat
private boolean receivedAHeartbeat = false;
private Object lastDetails;
private boolean hasOutstandingHeartbeat;
private ScheduledFuture<?> scheduledHeartbeat;
private ScheduledFuture<?> heartbeatTimeoutFuture;

private ActivityCompletionException lastException;

Expand All @@ -61,6 +80,30 @@ public HeartbeatContextImpl(
String identity,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval) {
this(
service,
namespace,
info,
dataConverter,
heartbeatExecutor,
metricsScope,
identity,
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval,
getLocalHeartbeatTimeoutBufferMillis());
}

HeartbeatContextImpl(
WorkflowServiceStubs service,
String namespace,
ActivityInfo info,
DataConverter dataConverter,
ScheduledExecutorService heartbeatExecutor,
Scope metricsScope,
String identity,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval,
long localHeartbeatTimeoutBufferMillis) {
this.service = service;
this.metricsScope = metricsScope;
this.dataConverter = dataConverter;
Expand All @@ -83,6 +126,15 @@ public HeartbeatContextImpl(
info.getHeartbeatTimeout(),
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval);
this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis();
this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis;
if (this.heartbeatTimeoutMillis > 0) {
this.heartbeatTimeoutFuture =
heartbeatExecutor.schedule(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you did and executor and a future instead of just keeping track of the last time a heartbeat succeeded and throwing an exception if it exceeded the timeout?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw from where? The heartbeating method itself? Good point. I did this because I was thinking we'd proactively cancel the activity a-la what Core does, but, that turns out not to work here without more refactoring.

Can certainly change it to not bother w/ a future, or could do the more elaborate thing. Former is probably sufficient and is easier.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this::onHeartbeatTimeout,
heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis,
TimeUnit.MILLISECONDS);
}
}

/**
Expand Down Expand Up @@ -167,6 +219,10 @@ public void cancelOutstandingHeartbeat() {
scheduledHeartbeat.cancel(false);
scheduledHeartbeat = null;
}
if (heartbeatTimeoutFuture != null) {
heartbeatTimeoutFuture.cancel(false);
heartbeatTimeoutFuture = null;
}
hasOutstandingHeartbeat = false;
} finally {
lock.unlock();
Expand All @@ -179,6 +235,9 @@ private void doHeartBeatLocked(Object details) {
sendHeartbeatRequest(details);
hasOutstandingHeartbeat = false;
nextHeartbeatDelay = heartbeatIntervalMillis;
// Reset the local heartbeat timeout timer only on successful send.
// If sends keep failing, the timer will eventually fire and cancel the activity.
resetHeartbeatTimeoutLocked();
} catch (StatusRuntimeException e) {
// Not rethrowing to not fail activity implementation on intermittent connection or Temporal
// errors.
Expand Down Expand Up @@ -214,6 +273,34 @@ private void scheduleNextHeartbeatLocked(long delay) {
TimeUnit.MILLISECONDS);
}

private void resetHeartbeatTimeoutLocked() {
if (heartbeatTimeoutFuture != null) {
heartbeatTimeoutFuture.cancel(false);
heartbeatTimeoutFuture =
heartbeatExecutor.schedule(
this::onHeartbeatTimeout,
heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis,
TimeUnit.MILLISECONDS);
}
}

private void onHeartbeatTimeout() {
lock.lock();
try {
if (lastException == null) {
log.warn(
"Activity heartbeat timed out locally. ActivityId={}, activityType={}",
info.getActivityId(),
info.getActivityType());
lastException =
new ActivityCanceledException(
info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT));
}
} finally {
lock.unlock();
}
}

private void sendHeartbeatRequest(Object details) {
try {
RecordActivityTaskHeartbeatResponse status =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package io.temporal.internal.activity;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.uber.m3.tally.NoopScope;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.activity.ActivityInfo;
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.client.ActivityCanceledException;
import io.temporal.client.ActivityCompletionException;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.failure.TimeoutFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testUtils.Eventually;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HeartbeatContextImplTest {

private static final long TEST_BUFFER_MILLIS = 200;

private ScheduledExecutorService heartbeatExecutor;
private WorkflowServiceStubs service;
private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub;

@Before
public void setUp() {
heartbeatExecutor = Executors.newScheduledThreadPool(1);
service = mock(WorkflowServiceStubs.class);
blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
when(service.blockingStub()).thenReturn(blockingStub);
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
}

@After
public void tearDown() {
heartbeatExecutor.shutdownNow();
}

@Test
public void heartbeatTimeoutLocallyCancelsActivity() {
Duration heartbeatTimeout = Duration.ofMillis(500);

// All heartbeat RPCs fail with UNAVAILABLE
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));

ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
HeartbeatContextImpl ctx = createHeartbeatContext(info);

long startNanos = System.nanoTime();
ctx.heartbeat("details-1");

ActivityCompletionException caught =
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("poll");
fail("Expected ActivityCanceledException");
return null;
} catch (ActivityCompletionException e) {
return e;
}
});

long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();

assertSame(ActivityCanceledException.class, caught.getClass());
assertNotNull("Expected a TimeoutFailure cause", caught.getCause());
assertSame(TimeoutFailure.class, caught.getCause().getClass());
assertEquals(
TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType());
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
assertTrue(
"Timeout should not fire before heartbeat timeout + buffer ("
+ elapsedMs
+ "ms elapsed, expected >= "
+ expectedMinMs
+ "ms)",
elapsedMs >= expectedMinMs);

ctx.cancelOutstandingHeartbeat();
}

@Test
public void heartbeatTimeoutResetsOnSuccessfulSend() {
Duration heartbeatTimeout = Duration.ofMillis(500);
AtomicInteger callCount = new AtomicInteger();

// First call succeeds, then all subsequent calls fail
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenAnswer(
invocation -> {
if (callCount.getAndIncrement() == 0) {
return RecordActivityTaskHeartbeatResponse.getDefaultInstance();
}
throw new StatusRuntimeException(Status.UNAVAILABLE);
});

ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet).
// Record the time before calling — the timer reset happens during this call.
long resetNanos = System.nanoTime();
ctx.heartbeat("details-1");
assertEquals("First RPC should have been the successful one", 1, callCount.get());

// Poll until the timeout fires again (from the reset point)
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("poll");
fail("Expected ActivityCanceledException");
} catch (ActivityCanceledException e) {
// expected
}
});

long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis();
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
assertTrue(
"Timeout should not fire before heartbeat timeout + buffer from reset point ("
+ elapsedSinceResetMs
+ "ms elapsed since reset, expected >= "
+ expectedMinMs
+ "ms)",
elapsedSinceResetMs >= expectedMinMs);

ctx.cancelOutstandingHeartbeat();
}

private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) {
return new HeartbeatContextImpl(
service,
"test-namespace",
info,
GlobalDataConverter.get(),
heartbeatExecutor,
new NoopScope(),
"test-identity",
Duration.ofSeconds(60),
Duration.ofSeconds(30),
TEST_BUFFER_MILLIS);
}

private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) {
ActivityInfo info = mock(ActivityInfo.class);
when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout);
when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3});
when(info.getWorkflowId()).thenReturn("test-workflow-id");
when(info.getWorkflowType()).thenReturn("test-workflow-type");
when(info.getActivityType()).thenReturn("test-activity-type");
when(info.getActivityTaskQueue()).thenReturn("test-task-queue");
when(info.getActivityId()).thenReturn("test-activity-id");
when(info.isLocal()).thenReturn(false);
when(info.getHeartbeatDetails()).thenReturn(Optional.empty());
return info;
}
}
Loading