Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public ActivityCanceledException(ActivityInfo info) {
super(info);
}

public ActivityCanceledException(ActivityInfo info, Throwable cause) {
super(info, cause);
}

public ActivityCanceledException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInfo;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
import io.temporal.client.*;
import io.temporal.common.converter.DataConverter;
import io.temporal.failure.TimeoutFailure;
import io.temporal.internal.client.ActivityClientHelper;
import io.temporal.payload.context.ActivitySerializationContext;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand All @@ -28,6 +30,21 @@
class HeartbeatContextImpl implements HeartbeatContext {
private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class);
private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000;
// Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking.
private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000;
static final String LOCAL_TIMEOUT_BUFFER_PROPERTY = "temporal.activity.localTimeoutBufferMs";

static long getLocalHeartbeatTimeoutBufferMillis() {
String val = System.getProperty(LOCAL_TIMEOUT_BUFFER_PROPERTY);
if (val != null) {
try {
return Long.parseLong(val);
} catch (NumberFormatException e) {
log.warn("Invalid {} value: {}", LOCAL_TIMEOUT_BUFFER_PROPERTY, val);
}
}
return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS;
}

private final Lock lock = new ReentrantLock();

Expand All @@ -42,13 +59,20 @@ class HeartbeatContextImpl implements HeartbeatContext {

private final Scope metricsScope;
private final Optional<Payloads> prevAttemptHeartbeatDetails;
private final long heartbeatTimeoutMillis;
private final long localHeartbeatTimeoutBufferMillis;

// turned into true on a reception of the first heartbeat
private boolean receivedAHeartbeat = false;
private Object lastDetails;
private boolean hasOutstandingHeartbeat;
private ScheduledFuture<?> scheduledHeartbeat;

// Deadline (in nanos, from System.nanoTime()) by which a successful heartbeat must occur.
// 0 means no local timeout is active.
private long heartbeatTimeoutDeadlineNanos;
private boolean heartbeatTimedOut;

private ActivityCompletionException lastException;

public HeartbeatContextImpl(
Expand All @@ -61,6 +85,30 @@ public HeartbeatContextImpl(
String identity,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval) {
this(
service,
namespace,
info,
dataConverter,
heartbeatExecutor,
metricsScope,
identity,
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval,
getLocalHeartbeatTimeoutBufferMillis());
}

HeartbeatContextImpl(
WorkflowServiceStubs service,
String namespace,
ActivityInfo info,
DataConverter dataConverter,
ScheduledExecutorService heartbeatExecutor,
Scope metricsScope,
String identity,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval,
long localHeartbeatTimeoutBufferMillis) {
this.service = service;
this.metricsScope = metricsScope;
this.dataConverter = dataConverter;
Expand All @@ -83,6 +131,11 @@ public HeartbeatContextImpl(
info.getHeartbeatTimeout(),
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval);
this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis();
this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis;
if (this.heartbeatTimeoutMillis > 0) {
this.heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos();
}
}

/**
Expand All @@ -95,6 +148,7 @@ public <V> void heartbeat(V details) throws ActivityCompletionException {
}
lock.lock();
try {
checkHeartbeatTimeoutDeadlineLocked();
receivedAHeartbeat = true;
lastDetails = details;
hasOutstandingHeartbeat = true;
Expand Down Expand Up @@ -167,6 +221,7 @@ public void cancelOutstandingHeartbeat() {
scheduledHeartbeat.cancel(false);
scheduledHeartbeat = null;
}
heartbeatTimeoutDeadlineNanos = 0;
hasOutstandingHeartbeat = false;
} finally {
lock.unlock();
Expand All @@ -179,6 +234,12 @@ private void doHeartBeatLocked(Object details) {
sendHeartbeatRequest(details);
hasOutstandingHeartbeat = false;
nextHeartbeatDelay = heartbeatIntervalMillis;
// Reset the local heartbeat timeout deadline only on successful send.
// If sends keep failing, the next heartbeat() call after the deadline will cancel the
// activity.
if (heartbeatTimeoutDeadlineNanos != 0) {
heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos();
}
} catch (StatusRuntimeException e) {
// Not rethrowing to not fail activity implementation on intermittent connection or Temporal
// errors.
Expand Down Expand Up @@ -214,6 +275,27 @@ private void scheduleNextHeartbeatLocked(long delay) {
TimeUnit.MILLISECONDS);
}

private long computeHeartbeatTimeoutDeadlineNanos() {
return System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis);
}

private void checkHeartbeatTimeoutDeadlineLocked() {
if (heartbeatTimedOut) {
throw new ActivityCanceledException(
info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT));
}
if (heartbeatTimeoutDeadlineNanos != 0 && System.nanoTime() >= heartbeatTimeoutDeadlineNanos) {
heartbeatTimedOut = true;
log.warn(
"Activity heartbeat timed out locally. ActivityId={}, activityType={}",
info.getActivityId(),
info.getActivityType());
throw new ActivityCanceledException(
info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT));
}
}

private void sendHeartbeatRequest(Object details) {
try {
RecordActivityTaskHeartbeatResponse status =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package io.temporal.internal.activity;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.uber.m3.tally.NoopScope;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.activity.ActivityInfo;
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.client.ActivityCanceledException;
import io.temporal.client.ActivityCompletionException;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.failure.TimeoutFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testUtils.Eventually;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HeartbeatContextImplTest {

private static final long TEST_BUFFER_MILLIS = 200;

private ScheduledExecutorService heartbeatExecutor;
private WorkflowServiceStubs service;
private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub;

@Before
public void setUp() {
heartbeatExecutor = Executors.newScheduledThreadPool(1);
service = mock(WorkflowServiceStubs.class);
blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
when(service.blockingStub()).thenReturn(blockingStub);
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
}

@After
public void tearDown() {
heartbeatExecutor.shutdownNow();
}

@Test
public void heartbeatTimeoutLocallyCancelsActivity() {
Duration heartbeatTimeout = Duration.ofMillis(500);

// All heartbeat RPCs fail with UNAVAILABLE
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));

ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
HeartbeatContextImpl ctx = createHeartbeatContext(info);

long startNanos = System.nanoTime();
ctx.heartbeat("details-1");

ActivityCompletionException caught =
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("poll");
fail("Expected ActivityCanceledException");
return null;
} catch (ActivityCompletionException e) {
return e;
}
});

long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();

assertSame(ActivityCanceledException.class, caught.getClass());
assertNotNull("Expected a TimeoutFailure cause", caught.getCause());
assertSame(TimeoutFailure.class, caught.getCause().getClass());
assertEquals(
TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType());
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
assertTrue(
"Timeout should not fire before heartbeat timeout + buffer ("
+ elapsedMs
+ "ms elapsed, expected >= "
+ expectedMinMs
+ "ms)",
elapsedMs >= expectedMinMs);

ctx.cancelOutstandingHeartbeat();
}

@Test
public void heartbeatTimeoutResetsOnSuccessfulSend() {
Duration heartbeatTimeout = Duration.ofMillis(500);
AtomicInteger callCount = new AtomicInteger();

// First call succeeds, then all subsequent calls fail
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenAnswer(
invocation -> {
if (callCount.getAndIncrement() == 0) {
return RecordActivityTaskHeartbeatResponse.getDefaultInstance();
}
throw new StatusRuntimeException(Status.UNAVAILABLE);
});

ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet).
// Record the time before calling — the timer reset happens during this call.
long resetNanos = System.nanoTime();
ctx.heartbeat("details-1");
assertEquals("First RPC should have been the successful one", 1, callCount.get());

// Poll until the timeout fires again (from the reset point)
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("poll");
fail("Expected ActivityCanceledException");
} catch (ActivityCanceledException e) {
// expected
}
});

long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis();
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
assertTrue(
"Timeout should not fire before heartbeat timeout + buffer from reset point ("
+ elapsedSinceResetMs
+ "ms elapsed since reset, expected >= "
+ expectedMinMs
+ "ms)",
elapsedSinceResetMs >= expectedMinMs);

ctx.cancelOutstandingHeartbeat();
}

@Test
public void heartbeatTimeoutPersistsAcrossMultipleCalls() {
Duration heartbeatTimeout = Duration.ofMillis(500);

// All heartbeat RPCs fail with UNAVAILABLE
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));

ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
HeartbeatContextImpl ctx = createHeartbeatContext(info);

ctx.heartbeat("details-1");

// Wait for timeout to fire
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("poll");
fail("Expected ActivityCanceledException");
} catch (ActivityCanceledException e) {
// expected
}
});

// Subsequent calls should continue to throw
for (int i = 0; i < 5; i++) {
try {
ctx.heartbeat("details-" + i);
fail("Expected ActivityCanceledException on call " + i);
} catch (ActivityCompletionException e) {
assertSame(ActivityCanceledException.class, e.getClass());
assertNotNull(e.getCause());
assertSame(TimeoutFailure.class, e.getCause().getClass());
}
}

ctx.cancelOutstandingHeartbeat();
}

private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) {
return new HeartbeatContextImpl(
service,
"test-namespace",
info,
GlobalDataConverter.get(),
heartbeatExecutor,
new NoopScope(),
"test-identity",
Duration.ofSeconds(60),
Duration.ofSeconds(30),
TEST_BUFFER_MILLIS);
}

private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) {
ActivityInfo info = mock(ActivityInfo.class);
when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout);
when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3});
when(info.getWorkflowId()).thenReturn("test-workflow-id");
when(info.getWorkflowType()).thenReturn("test-workflow-type");
when(info.getActivityType()).thenReturn("test-activity-type");
when(info.getActivityTaskQueue()).thenReturn("test-task-queue");
when(info.getActivityId()).thenReturn("test-activity-id");
when(info.isLocal()).thenReturn(false);
when(info.getHeartbeatDetails()).thenReturn(Optional.empty());
return info;
}
}
Loading