Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ jobs:
--dynamic-config-value nexusoperation.enableStandalone=true \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableCHASMSignalBacklinks=true \
--dynamic-config-value history.enableTransitionHistory=true &
--dynamic-config-value history.enableTransitionHistory=true \
--dynamic-config-value frontend.enableCancelWorkerPollsOnShutdown=true \
--dynamic-config-value frontend.workerCommandsEnabled=true \
--dynamic-config-value system.enableCancelActivityWorkerCommand=true &
sleep 10s

# Can't actually run tests against Java 8 because Mockito 5 requires Java 11+.
Expand Down
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- The SDK code is written for Java 8.

## Building and Testing
1. Format the code before committing:
1. Format the code before committing (and don't bother running spotlessCheck, just run apply):
```bash
./gradlew --offline spotlessApply
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.temporal.activity;

import io.temporal.client.ActivityCanceledException;
import java.util.concurrent.CompletableFuture;

/** Token that allows an Activity implementation to observe cancellation requests. */
public interface ActivityCancellationToken {

ActivityCancellationToken NONE =
new ActivityCancellationToken() {
@Override
public boolean isCancellationRequested() {
return false;
}

@Override
public void throwIfCancellationRequested() throws ActivityCanceledException {}

@Override
public CompletableFuture<Void> getCancellationRequest() {
return new CompletableFuture<>();
}
};

/**
* Returns true after cancellation has been requested for this Activity Execution.
*
* <p>If this method returns true, the Activity implementation should stop its work and usually
* call {@link #throwIfCancellationRequested()} to report successful cancellation to Temporal.
*/
boolean isCancellationRequested();

/**
* Throws {@link ActivityCanceledException} if cancellation has been requested for this Activity
* Execution.
*
* <p>Rethrowing this exception from Activity code reports successful cancellation to Temporal.
*/
void throwIfCancellationRequested() throws ActivityCanceledException;

/**
* Future that completes when cancellation has been requested for this Activity Execution.
*
* <p>The future completes normally. Activity code should still call {@link
* #throwIfCancellationRequested()} or otherwise report cancellation if it wants the Activity
* Execution to complete as canceled.
*/
CompletableFuture<Void> getCancellationRequest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,18 @@ public interface ActivityExecutionContext {
*/
byte[] getTaskToken();

/**
* Returns a token that can be used by Activity code to observe cancellation requests without
* recording Heartbeats.
*/
default ActivityCancellationToken getCancellationToken() {
return ActivityCancellationToken.NONE;
}

/**
* If this method is called during an Activity Execution then the Activity Execution is not going
* to complete when it's method returns. It is expected to be completed asynchronously using
* {@link io.temporal.client.ActivityCompletionClient}.
* to complete when its method returns. It is expected to be completed asynchronously using {@link
* io.temporal.client.ActivityCompletionClient}.
*
* <p>Async Activity Executions that have {@link #isUseLocalManualCompletion()} set to false will
* not respect the limit defined by {@link WorkerOptions#getMaxConcurrentActivityExecutionSize()}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityCancellationToken;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ManualActivityCompletionClient;
Expand Down Expand Up @@ -52,6 +53,11 @@ public byte[] getTaskToken() {
return next.getTaskToken();
}

@Override
public ActivityCancellationToken getCancellationToken() {
return next.getCancellationToken();
}

@Override
public void doNotCompleteOnReturn() {
next.doNotCompleteOnReturn();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.temporal.internal.activity;

import io.temporal.activity.ActivityCancellationToken;
import io.temporal.client.ActivityCanceledException;
import java.util.concurrent.CompletableFuture;

final class ActivityCancellationTokenImpl implements ActivityCancellationToken {
private final CompletableFuture<Void> cancellationRequest = new CompletableFuture<>();
private volatile ActivityCanceledException cancellationException;

@Override
public boolean isCancellationRequested() {
return cancellationException != null;
}

@Override
public void throwIfCancellationRequested() throws ActivityCanceledException {
ActivityCanceledException exception = cancellationException;
if (exception != null) {
throw exception;
}
}

@Override
public CompletableFuture<Void> getCancellationRequest() {
return cancellationRequest;
}

void requestCancel(ActivityCanceledException exception) {
cancellationException = exception;
cancellationRequest.complete(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,14 @@
public interface ActivityExecutionContextFactory {
InternalActivityExecutionContext createContext(
ActivityInfoInternal info, Object activity, Scope metricsScope);

/**
* Removes a context for a currently running activity identified by task token and optionally
* requests cancellation.
*
* @return true if the activity was found and cleaned up.
*/
default boolean cleanupContext(byte[] taskToken, boolean cancel) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import io.temporal.client.WorkflowClient;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;

public class ActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory {
Expand All @@ -17,6 +21,8 @@ public class ActivityExecutionContextFactoryImpl implements ActivityExecutionCon
private final DataConverter dataConverter;
private final ScheduledExecutorService heartbeatExecutor;
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
private final ConcurrentMap<ByteBuffer, ActivityExecutionContextImpl> activeContexts =
new ConcurrentHashMap<>();

public ActivityExecutionContextFactoryImpl(
WorkflowClient client,
Expand All @@ -42,18 +48,39 @@ public ActivityExecutionContextFactoryImpl(
@Override
public InternalActivityExecutionContext createContext(
ActivityInfoInternal info, Object activity, Scope metricsScope) {
return new ActivityExecutionContextImpl(
client,
namespace,
activity,
info,
dataConverter,
heartbeatExecutor,
manualCompletionClientFactory,
info.getCompletionHandle(),
metricsScope,
identity,
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval);
ByteBuffer taskToken = taskTokenKey(info.getTaskToken());
ActivityExecutionContextImpl context =
new ActivityExecutionContextImpl(
client,
namespace,
activity,
info,
dataConverter,
heartbeatExecutor,
manualCompletionClientFactory,
info.getCompletionHandle(),
metricsScope,
identity,
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval,
() -> cleanupContext(info.getTaskToken(), false));
activeContexts.put(taskToken, context);
return context;
}

@Override
public boolean cleanupContext(byte[] taskToken, boolean cancel) {
ActivityExecutionContextImpl context = activeContexts.remove(taskTokenKey(taskToken));
if (context == null) {
return false;
}
if (cancel) {
context.cancelFromWorkerCommand();
}
return true;
}

private static ByteBuffer taskTokenKey(byte[] taskToken) {
return ByteBuffer.wrap(Arrays.copyOf(taskToken, taskToken.length)).asReadOnlyBuffer();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.temporal.internal.activity;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityCancellationToken;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ManualActivityCompletionClient;
Expand Down Expand Up @@ -32,6 +33,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
private final Functions.Proc completionHandle;
private final HeartbeatContext heartbeatContext;
private final Functions.Proc closeCallback;

private final Scope metricsScope;
private final ActivityInfo info;
Expand All @@ -51,12 +53,14 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
Scope metricsScope,
String identity,
Duration maxHeartbeatThrottleInterval,
Duration defaultHeartbeatThrottleInterval) {
Duration defaultHeartbeatThrottleInterval,
Functions.Proc closeCallback) {
this.client = client;
this.activity = activity;
this.metricsScope = metricsScope;
this.info = info;
this.completionHandle = completionHandle;
this.closeCallback = closeCallback;
this.manualCompletionClientFactory = manualCompletionClientFactory;
this.heartbeatContext =
new HeartbeatContextImpl(
Expand Down Expand Up @@ -105,6 +109,11 @@ public byte[] getTaskToken() {
return info.getTaskToken();
}

@Override
public ActivityCancellationToken getCancellationToken() {
return heartbeatContext.getCancellationToken();
}

@Override
public void doNotCompleteOnReturn() {
lock.lock();
Expand Down Expand Up @@ -170,6 +179,16 @@ public Object getLastHeartbeatValue() {
@Override
public void cancelOutstandingHeartbeat() {
heartbeatContext.cancelOutstandingHeartbeat();
closeCallback.apply();
}

@Override
public void asyncCompletionStarted() {
heartbeatContext.asyncCompletionStarted();
}

void cancelFromWorkerCommand() {
heartbeatContext.cancelFromWorkerCommand();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,13 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
local,
dataConverterWithActivityContext);
} finally {
if (!context.isDoNotCompleteOnReturn()) {
// if the activity is not completed, we need to cancel the heartbeat
if (context.isDoNotCompleteOnReturn()) {
if (!context.isUseLocalManualCompletion()) {
context.asyncCompletionStarted();
}
executionContextFactory.cleanupContext(info.getTaskToken(), false);
} else {
// if the activity is completed, we need to cancel the heartbeat
// to avoid sending it after the activity is completed
context.cancelOutstandingHeartbeat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public boolean isTypeSupported(String type) {
return activities.get(type) != null || dynamicActivity != null;
}

public boolean requestCancel(byte[] taskToken) {
return executionContextFactory.cleanupContext(taskToken, true);
}

public void registerActivityImplementations(Object[] activitiesImplementation) {
for (Object activity : activitiesImplementation) {
registerActivityImplementation(activity);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.temporal.internal.activity;

import io.temporal.activity.ActivityCancellationToken;
import io.temporal.client.ActivityCompletionException;
import java.lang.reflect.Type;
import java.util.Optional;
Expand All @@ -23,6 +24,14 @@ interface HeartbeatContext {

Object getLatestHeartbeatDetails();

ActivityCancellationToken getCancellationToken();

/** Mark this activity as canceled by an external worker command. */
void cancelFromWorkerCommand();

/** Mark this activity as returned for async completion. */
void asyncCompletionStarted();

/** Cancel any pending heartbeat and discard cached heartbeat details. */
void cancelOutstandingHeartbeat();
}
Loading
Loading