diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java
index bbf5e198..34d7d549 100644
--- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java
+++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java
@@ -9,6 +9,13 @@
import com.microsoft.azure.functions.HttpStatus;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.DurableTaskGrpcClientFactory;
+import com.microsoft.durabletask.DurableEntityClient;
+import com.microsoft.durabletask.EntityInstanceId;
+import com.microsoft.durabletask.EntityMetadata;
+import com.microsoft.durabletask.EntityQuery;
+import com.microsoft.durabletask.EntityQueryResult;
+import com.microsoft.durabletask.CleanEntityStorageRequest;
+import com.microsoft.durabletask.CleanEntityStorageResult;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
@@ -133,6 +140,79 @@ public HttpManagementPayload createHttpManagementPayload(HttpRequestMessage> r
return this.getClientResponseLinks(request, instanceId);
}
+ /**
+ * Gets the entity client for interacting with durable entities.
+ *
+ * This mirrors the .NET SDK's {@code DurableTaskClient.Entities} property.
+ *
+ * @return the {@link DurableEntityClient} for this client
+ */
+ public DurableEntityClient getEntities() {
+ return getClient().getEntities();
+ }
+
+ /**
+ * Sends a fire-and-forget signal to a durable entity.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ * @param input the input to pass to the operation (may be {@code null})
+ */
+ public void signalEntity(EntityInstanceId entityId, String operationName, Object input) {
+ getClient().getEntities().signalEntity(entityId, operationName, input);
+ }
+
+ /**
+ * Sends a fire-and-forget signal to a durable entity with no input.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ */
+ public void signalEntity(EntityInstanceId entityId, String operationName) {
+ getClient().getEntities().signalEntity(entityId, operationName);
+ }
+
+ /**
+ * Gets the metadata for a durable entity, including optionally its serialized state.
+ *
+ * @param entityId the entity's instance ID
+ * @param includeState whether to include the entity's serialized state in the result
+ * @return the entity metadata, or {@code null} if the entity does not exist
+ */
+ public EntityMetadata getEntityMetadata(EntityInstanceId entityId, boolean includeState) {
+ return getClient().getEntities().getEntityMetadata(entityId, includeState);
+ }
+
+ /**
+ * Gets the metadata for a durable entity without including its serialized state.
+ *
+ * @param entityId the entity's instance ID
+ * @return the entity metadata, or {@code null} if the entity does not exist
+ */
+ public EntityMetadata getEntityMetadata(EntityInstanceId entityId) {
+ return getClient().getEntities().getEntityMetadata(entityId);
+ }
+
+ /**
+ * Queries the durable store for entity instances matching the specified filter criteria.
+ *
+ * @param query the query filter criteria
+ * @return the query result containing matching entities and an optional continuation token
+ */
+ public EntityQueryResult queryEntities(EntityQuery query) {
+ return getClient().getEntities().queryEntities(query);
+ }
+
+ /**
+ * Cleans up entity storage by removing empty entities and/or releasing orphaned locks.
+ *
+ * @param request the clean storage request specifying what to clean
+ * @return the result of the clean operation, including counts of removed entities and released locks
+ */
+ public CleanEntityStorageResult cleanEntityStorage(CleanEntityStorageRequest request) {
+ return getClient().getEntities().cleanEntityStorage(request);
+ }
+
private HttpManagementPayload getClientResponseLinks(HttpRequestMessage> request, String instanceId) {
String instanceStatusURL = this.getInstanceStatusURL(request, instanceId);
return new HttpManagementPayload(instanceId, instanceStatusURL, this.requiredQueryStringParameters);
diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableEntityTrigger.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableEntityTrigger.java
new file mode 100644
index 00000000..46ef4857
--- /dev/null
+++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableEntityTrigger.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (c) Microsoft Corporation. All rights reserved.
+ * Licensed under the MIT License. See License.txt in the project root for
+ * license information.
+ */
+
+package com.microsoft.durabletask.azurefunctions;
+
+import com.microsoft.azure.functions.annotation.CustomBinding;
+import com.microsoft.azure.functions.annotation.HasImplicitOutput;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ *
+ * Azure Functions attribute for binding a function parameter to a Durable Task entity request.
+ *
+ * The following is an example of an entity function that uses this trigger binding to implement
+ * a counter entity backed by a {@code TaskEntity} subclass.
+ *
+ *
+ * {@literal @}FunctionName("Counter")
+ * public String counterEntity(
+ * {@literal @}DurableEntityTrigger(name = "req") String req) {
+ * return EntityRunner.loadAndRun(req, () -> new CounterEntity());
+ * }
+ *
+ *
+ * @since 2.0.0
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.PARAMETER)
+@CustomBinding(direction = "in", name = "", type = "entityTrigger")
+@HasImplicitOutput
+public @interface DurableEntityTrigger {
+ /**
+ * The name of the entity function.
+ * If not specified, the function name is used as the name of the entity.
+ * This property supports binding parameters.
+ *
+ * @return The name of the entity function.
+ */
+ String entityName() default "";
+
+ /**
+ * The variable name used in function.json.
+ *
+ * @return The variable name used in function.json.
+ */
+ String name();
+
+ /**
+ *
+ * Defines how Functions runtime should treat the parameter value. Possible values are:
+ *
+ *
+ * - "": get the value as a string, and try to deserialize to actual parameter type like POJO
+ * - string: always get the value as a string
+ * - binary: get the value as a binary data, and try to deserialize to actual parameter type byte[]
+ *
+ *
+ * @return The dataType which will be used by the Functions runtime.
+ */
+ String dataType() default "string";
+}
diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/EntityMiddleware.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/EntityMiddleware.java
new file mode 100644
index 00000000..e9f2ea63
--- /dev/null
+++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/EntityMiddleware.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (c) Microsoft Corporation. All rights reserved.
+ * Licensed under the MIT License. See License.txt in the project root for
+ * license information.
+ */
+
+package com.microsoft.durabletask.azurefunctions.internal.middleware;
+
+import com.microsoft.azure.functions.internal.spi.middleware.Middleware;
+import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareChain;
+import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareContext;
+import com.microsoft.durabletask.DataConverter;
+
+/**
+ * Durable Function Entity Middleware
+ *
+ * This class is internal and is hence not for public use. Its APIs are unstable and can change
+ * at any time.
+ */
+public class EntityMiddleware implements Middleware {
+
+ private static final String ENTITY_TRIGGER = "DurableEntityTrigger";
+
+ @Override
+ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exception {
+ String parameterName = context.getParameterName(ENTITY_TRIGGER);
+ if (parameterName == null) {
+ chain.doNext(context);
+ return;
+ }
+
+ // The entity function receives the raw base64-encoded EntityBatchRequest as a String.
+ // The user function is expected to call EntityRunner.loadAndRun() with a TaskEntityFactory
+ // and return the base64-encoded EntityBatchResult.
+ //
+ // Unlike orchestrations, entity operations are simple request/response calls with no
+ // replay-based blocking (no OrchestratorBlockedException equivalent), so the middleware
+ // delegates directly to the user function.
+ try {
+ chain.doNext(context);
+ } catch (Exception e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof DataConverter.DataConverterException) {
+ throw (DataConverter.DataConverterException) cause;
+ }
+ throw new RuntimeException("Unexpected failure in entity function execution", e);
+ }
+ }
+}
diff --git a/azurefunctions/src/main/resources/META-INF/services/com.microsoft.azure.functions.internal.spi.middleware.Middleware b/azurefunctions/src/main/resources/META-INF/services/com.microsoft.azure.functions.internal.spi.middleware.Middleware
index 26168496..0ba98d04 100644
--- a/azurefunctions/src/main/resources/META-INF/services/com.microsoft.azure.functions.internal.spi.middleware.Middleware
+++ b/azurefunctions/src/main/resources/META-INF/services/com.microsoft.azure.functions.internal.spi.middleware.Middleware
@@ -1 +1,2 @@
-com.microsoft.durabletask.azurefunctions.internal.middleware.OrchestrationMiddleware
\ No newline at end of file
+com.microsoft.durabletask.azurefunctions.internal.middleware.OrchestrationMiddleware
+com.microsoft.durabletask.azurefunctions.internal.middleware.EntityMiddleware
\ No newline at end of file
diff --git a/azuremanaged/build.gradle b/azuremanaged/build.gradle
index bdd640be..293d86e7 100644
--- a/azuremanaged/build.gradle
+++ b/azuremanaged/build.gradle
@@ -58,7 +58,8 @@ compileTestJava {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
options.fork = true
- options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac"
+ def javacExe = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'javac.exe' : 'javac'
+ options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/${javacExe}"
}
test {
diff --git a/client/build.gradle b/client/build.gradle
index 350ccf5b..23267782 100644
--- a/client/build.gradle
+++ b/client/build.gradle
@@ -60,7 +60,8 @@ compileTestJava {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
options.fork = true
- options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac"
+ def javacExe = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'javac.exe' : 'javac'
+ options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/${javacExe}"
}
task downloadProtoFiles {
@@ -110,7 +111,8 @@ sourceSets {
}
tasks.withType(Test) {
- executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", 'bin/java')
+ def javaExe = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'java.exe' : 'java'
+ executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", "bin/${javaExe}")
}
test {
diff --git a/client/src/main/java/com/microsoft/durabletask/CallEntityOptions.java b/client/src/main/java/com/microsoft/durabletask/CallEntityOptions.java
new file mode 100644
index 00000000..3b92ff32
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/CallEntityOptions.java
@@ -0,0 +1,41 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.time.Duration;
+
+/**
+ * Options for calling a durable entity and waiting for a response.
+ */
+public final class CallEntityOptions {
+ private Duration timeout;
+
+ /**
+ * Creates a new {@code CallEntityOptions} with default settings.
+ */
+ public CallEntityOptions() {
+ }
+
+ /**
+ * Sets the timeout for the entity call. If the entity does not respond within this duration,
+ * the call will fail with a timeout exception.
+ *
+ * @param timeout the maximum duration to wait for a response
+ * @return this {@code CallEntityOptions} object for chaining
+ */
+ public CallEntityOptions setTimeout(@Nullable Duration timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Gets the timeout for the entity call, or {@code null} if no timeout is configured.
+ *
+ * @return the timeout duration, or {@code null}
+ */
+ @Nullable
+ public Duration getTimeout() {
+ return this.timeout;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/CleanEntityStorageRequest.java b/client/src/main/java/com/microsoft/durabletask/CleanEntityStorageRequest.java
new file mode 100644
index 00000000..69a2a852
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/CleanEntityStorageRequest.java
@@ -0,0 +1,109 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+
+/**
+ * Represents a request to clean up entity storage by removing empty entities and/or releasing orphaned locks.
+ *
+ * Use the builder-style setters to configure the request, then pass it to
+ * {@link DurableEntityClient#cleanEntityStorage(CleanEntityStorageRequest)}.
+ */
+public final class CleanEntityStorageRequest {
+ private String continuationToken;
+ private boolean removeEmptyEntities = true;
+ private boolean releaseOrphanedLocks = true;
+
+ /**
+ * Creates a new {@code CleanEntityStorageRequest} with default settings.
+ * By default, both {@code removeEmptyEntities} and {@code releaseOrphanedLocks} are {@code true}.
+ */
+ public CleanEntityStorageRequest() {
+ }
+
+ /**
+ * Sets the continuation token for resuming a previous clean operation.
+ *
+ * @param continuationToken the continuation token, or {@code null} to start from the beginning
+ * @return this {@code CleanEntityStorageRequest} for chaining
+ */
+ public CleanEntityStorageRequest setContinuationToken(@Nullable String continuationToken) {
+ this.continuationToken = continuationToken;
+ return this;
+ }
+
+ /**
+ * Gets the continuation token for resuming a previous clean operation.
+ *
+ * @return the continuation token, or {@code null}
+ */
+ @Nullable
+ public String getContinuationToken() {
+ return this.continuationToken;
+ }
+
+ /**
+ * Sets whether to remove entities that have no state and no pending operations.
+ *
+ * @param removeEmptyEntities {@code true} to remove empty entities
+ * @return this {@code CleanEntityStorageRequest} for chaining
+ */
+ public CleanEntityStorageRequest setRemoveEmptyEntities(boolean removeEmptyEntities) {
+ this.removeEmptyEntities = removeEmptyEntities;
+ return this;
+ }
+
+ /**
+ * Gets whether empty entities should be removed.
+ *
+ * @return {@code true} if empty entities will be removed
+ */
+ public boolean isRemoveEmptyEntities() {
+ return this.removeEmptyEntities;
+ }
+
+ /**
+ * Sets whether to release locks held by orchestrations that no longer exist.
+ *
+ * @param releaseOrphanedLocks {@code true} to release orphaned locks
+ * @return this {@code CleanEntityStorageRequest} for chaining
+ */
+ public CleanEntityStorageRequest setReleaseOrphanedLocks(boolean releaseOrphanedLocks) {
+ this.releaseOrphanedLocks = releaseOrphanedLocks;
+ return this;
+ }
+
+ /**
+ * Gets whether orphaned locks should be released.
+ *
+ * @return {@code true} if orphaned locks will be released
+ */
+ public boolean isReleaseOrphanedLocks() {
+ return this.releaseOrphanedLocks;
+ }
+
+ /**
+ * Sets whether the client should automatically continue cleaning with continuation tokens
+ * until all entities have been processed. When {@code true}, the client will loop internally,
+ * accumulating results across multiple pages.
+ *
+ * @param continueUntilComplete {@code true} to automatically continue until complete
+ * @return this {@code CleanEntityStorageRequest} for chaining
+ */
+ public CleanEntityStorageRequest setContinueUntilComplete(boolean continueUntilComplete) {
+ this.continueUntilComplete = continueUntilComplete;
+ return this;
+ }
+
+ /**
+ * Gets whether the client should automatically continue cleaning until all entities are processed.
+ *
+ * @return {@code true} if the client will automatically continue until complete
+ */
+ public boolean isContinueUntilComplete() {
+ return this.continueUntilComplete;
+ }
+
+ private boolean continueUntilComplete = true;
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/CleanEntityStorageResult.java b/client/src/main/java/com/microsoft/durabletask/CleanEntityStorageResult.java
new file mode 100644
index 00000000..f5ad588d
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/CleanEntityStorageResult.java
@@ -0,0 +1,59 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+
+/**
+ * Represents the result of a {@link DurableEntityClient#cleanEntityStorage(CleanEntityStorageRequest)} operation.
+ */
+public final class CleanEntityStorageResult {
+ private final String continuationToken;
+ private final int emptyEntitiesRemoved;
+ private final int orphanedLocksReleased;
+
+ /**
+ * Creates a new {@code CleanEntityStorageResult}.
+ *
+ * @param continuationToken the continuation token for resuming the clean operation, or {@code null} if complete
+ * @param emptyEntitiesRemoved the number of empty entities removed in this batch
+ * @param orphanedLocksReleased the number of orphaned locks released in this batch
+ */
+ CleanEntityStorageResult(
+ @Nullable String continuationToken,
+ int emptyEntitiesRemoved,
+ int orphanedLocksReleased) {
+ this.continuationToken = continuationToken;
+ this.emptyEntitiesRemoved = emptyEntitiesRemoved;
+ this.orphanedLocksReleased = orphanedLocksReleased;
+ }
+
+ /**
+ * Gets the continuation token for resuming the clean operation.
+ * If {@code null}, the clean operation has processed all entities.
+ *
+ * @return the continuation token, or {@code null} if the operation is complete
+ */
+ @Nullable
+ public String getContinuationToken() {
+ return this.continuationToken;
+ }
+
+ /**
+ * Gets the number of empty entities that were removed in this batch.
+ *
+ * @return the count of empty entities removed
+ */
+ public int getEmptyEntitiesRemoved() {
+ return this.emptyEntitiesRemoved;
+ }
+
+ /**
+ * Gets the number of orphaned locks that were released in this batch.
+ *
+ * @return the count of orphaned locks released
+ */
+ public int getOrphanedLocksReleased() {
+ return this.orphanedLocksReleased;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableEntityClient.java b/client/src/main/java/com/microsoft/durabletask/DurableEntityClient.java
new file mode 100644
index 00000000..61fcba6c
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/DurableEntityClient.java
@@ -0,0 +1,225 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+
+/**
+ * Client for interacting with durable entities.
+ *
+ * This class provides operations for signaling entities, querying entity metadata,
+ * and performing entity storage maintenance. Instances are obtained from
+ * {@link DurableTaskClient#getEntities()}.
+ *
+ * This design mirrors the .NET SDK's {@code DurableEntityClient} which is accessed
+ * via the {@code DurableTaskClient.Entities} property.
+ */
+public abstract class DurableEntityClient {
+
+ private final String name;
+
+ /**
+ * Creates a new {@code DurableEntityClient} instance.
+ *
+ * @param name the name of the client
+ */
+ protected DurableEntityClient(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets the name of this client.
+ *
+ * @return the client name
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * Sends a signal to a durable entity instance without waiting for a response.
+ *
+ * If the target entity does not exist, it will be created automatically when it receives the signal.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ */
+ public void signalEntity(EntityInstanceId entityId, String operationName) {
+ this.signalEntity(entityId, operationName, null, null);
+ }
+
+ /**
+ * Sends a signal with input to a durable entity instance without waiting for a response.
+ *
+ * If the target entity does not exist, it will be created automatically when it receives the signal.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ * @param input the serializable input for the operation, or {@code null}
+ */
+ public void signalEntity(EntityInstanceId entityId, String operationName, @Nullable Object input) {
+ this.signalEntity(entityId, operationName, input, null);
+ }
+
+ /**
+ * Sends a signal with input and options to a durable entity instance without waiting for a response.
+ *
+ * If the target entity does not exist, it will be created automatically when it receives the signal.
+ * Use {@link SignalEntityOptions#setScheduledTime(java.time.Instant)} to schedule the signal for
+ * delivery at a future time.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ * @param input the serializable input for the operation, or {@code null}
+ * @param options additional options for the signal, or {@code null}
+ */
+ public abstract void signalEntity(
+ EntityInstanceId entityId,
+ String operationName,
+ @Nullable Object input,
+ @Nullable SignalEntityOptions options);
+
+ /**
+ * Fetches the metadata for a durable entity instance, including its state by default.
+ *
+ * This matches the .NET SDK behavior where {@code includeState} defaults to {@code true}.
+ *
+ * @param entityId the entity instance ID to query
+ * @return the entity metadata, or {@code null} if the entity does not exist
+ */
+ @Nullable
+ public EntityMetadata getEntityMetadata(EntityInstanceId entityId) {
+ return this.getEntityMetadata(entityId, true);
+ }
+
+ /**
+ * Fetches the metadata for a durable entity instance, optionally including its state.
+ *
+ * @param entityId the entity instance ID to query
+ * @param includeState {@code true} to include the entity's serialized state in the result
+ * @return the entity metadata, or {@code null} if the entity does not exist
+ */
+ @Nullable
+ public abstract EntityMetadata getEntityMetadata(EntityInstanceId entityId, boolean includeState);
+
+ /**
+ * Fetches the metadata for a durable entity instance with typed state access.
+ *
+ * This always includes state in the result, matching the .NET SDK's
+ * {@code GetEntityAsync()} pattern. The returned {@link TypedEntityMetadata} provides
+ * a {@link TypedEntityMetadata#getState()} method for direct typed state access.
+ *
+ * {@code
+ * TypedEntityMetadata metadata = client.getEntities()
+ * .getEntityMetadata(entityId, Integer.class);
+ * if (metadata != null) {
+ * Integer state = metadata.getState();
+ * System.out.println("Counter value: " + state);
+ * }
+ * }
+ *
+ * @param entityId the entity instance ID to query
+ * @param stateType the class to deserialize the entity's state into
+ * @param the entity state type
+ * @return the typed entity metadata with state, or {@code null} if the entity does not exist
+ */
+ @Nullable
+ public TypedEntityMetadata getEntityMetadata(EntityInstanceId entityId, Class stateType) {
+ EntityMetadata metadata = this.getEntityMetadata(entityId, true);
+ if (metadata == null) {
+ return null;
+ }
+ return new TypedEntityMetadata<>(metadata, stateType);
+ }
+
+ /**
+ * Queries the durable store for entity instances matching the specified filter criteria.
+ *
+ * @param query the query filter criteria
+ * @return the query result containing matching entities and an optional continuation token
+ */
+ public abstract EntityQueryResult queryEntities(EntityQuery query);
+
+ /**
+ * Returns an auto-paginating iterable over entity instances matching the specified filter criteria.
+ *
+ * This method automatically handles pagination when iterating over results. It fetches pages
+ * from the store on demand, making it convenient when you want to process all matching entities
+ * without manually managing continuation tokens.
+ *
+ * You can iterate over individual items:
+ *
{@code
+ * for (EntityMetadata entity : client.getEntities().getAllEntities(query)) {
+ * System.out.println(entity.getEntityInstanceId());
+ * }
+ * }
+ *
+ * Or iterate page by page for more control:
+ *
{@code
+ * for (EntityQueryResult page : client.getEntities().getAllEntities(query).byPage()) {
+ * for (EntityMetadata entity : page.getEntities()) {
+ * System.out.println(entity.getEntityInstanceId());
+ * }
+ * }
+ * }
+ *
+ * @param query the query filter criteria
+ * @return a pageable iterable over all matching entities
+ */
+ public EntityQueryPageable getAllEntities(EntityQuery query) {
+ return new EntityQueryPageable(query, this::queryEntities);
+ }
+
+ /**
+ * Returns an auto-paginating iterable over all entity instances.
+ *
+ * This is a convenience overload equivalent to {@code getAllEntities(new EntityQuery())}.
+ *
+ * @return a pageable iterable over all entities
+ */
+ public EntityQueryPageable getAllEntities() {
+ return getAllEntities(new EntityQuery());
+ }
+
+ /**
+ * Returns an auto-paginating iterable over entity instances matching the specified filter criteria,
+ * with typed state access.
+ *
+ * This mirrors the .NET SDK's {@code GetAllEntitiesAsync()} pattern. Entity state is always
+ * included in the results and eagerly deserialized into the specified type. Each item is a
+ * {@link TypedEntityMetadata} with a {@link TypedEntityMetadata#getState()} accessor.
+ *
+ * Note: A copy of the query is made with {@code includeState} set to {@code true} so the
+ * original query is not modified.
+ *
+ *
{@code
+ * EntityQuery query = new EntityQuery().setInstanceIdStartsWith("counter");
+ * for (TypedEntityMetadata entity : client.getEntities().getAllEntities(query, Integer.class)) {
+ * Integer state = entity.getState();
+ * System.out.println("Counter value: " + state);
+ * }
+ * }
+ *
+ * @param query the query filter criteria
+ * @param stateType the class to deserialize each entity's state into
+ * @param the entity state type
+ * @return a pageable iterable over all matching entities with typed state
+ */
+ public TypedEntityQueryPageable getAllEntities(EntityQuery query, Class stateType) {
+ // Create a copy with includeState=true so we don't mutate the caller's query
+ EntityQuery typedQuery = query.copy().setIncludeState(true);
+ EntityQueryPageable inner = new EntityQueryPageable(typedQuery, this::queryEntities);
+ return new TypedEntityQueryPageable<>(inner, stateType);
+ }
+
+ /**
+ * Cleans up entity storage by removing empty entities and/or releasing orphaned locks.
+ *
+ * This is an administrative operation that can be used to reclaim storage space and fix
+ * entity state inconsistencies.
+ *
+ * @param request the clean storage request specifying what to clean
+ * @return the result of the clean operation, including counts of removed entities and released locks
+ */
+ public abstract CleanEntityStorageResult cleanEntityStorage(CleanEntityStorageRequest request);
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java
index 1e1b3cb0..adb33673 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java
@@ -356,4 +356,142 @@ public void resumeInstance(String instanceId) {
* @param reason the reason for resuming the orchestration instance
*/
public abstract void resumeInstance(String instanceId, @Nullable String reason);
+
+ // region Entity APIs
+
+ /**
+ * Gets the entity client for interacting with durable entities.
+ *
+ * This mirrors the .NET SDK's {@code DurableTaskClient.Entities} property, providing a
+ * dedicated client for entity operations such as signaling, querying, and storage management.
+ *
+ * @return the {@link DurableEntityClient} for this client
+ * @throws UnsupportedOperationException if the current client implementation does not support entities
+ */
+ public DurableEntityClient getEntities() {
+ throw new UnsupportedOperationException("Entity operations are not supported by this client implementation.");
+ }
+
+ /**
+ * Sends a signal to a durable entity instance without waiting for a response.
+ *
+ * If the target entity does not exist, it will be created automatically when it receives the signal.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ * @deprecated Use {@code getEntities().signalEntity(entityId, operationName)} instead.
+ */
+ @Deprecated
+ public void signalEntity(EntityInstanceId entityId, String operationName) {
+ this.getEntities().signalEntity(entityId, operationName);
+ }
+
+ /**
+ * Sends a signal with input to a durable entity instance without waiting for a response.
+ *
+ * If the target entity does not exist, it will be created automatically when it receives the signal.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ * @param input the serializable input for the operation, or {@code null}
+ * @deprecated Use {@code getEntities().signalEntity(entityId, operationName, input)} instead.
+ */
+ @Deprecated
+ public void signalEntity(EntityInstanceId entityId, String operationName, @Nullable Object input) {
+ this.getEntities().signalEntity(entityId, operationName, input);
+ }
+
+ /**
+ * Sends a signal with input and options to a durable entity instance without waiting for a response.
+ *
+ * @param entityId the target entity's instance ID
+ * @param operationName the name of the operation to invoke on the entity
+ * @param input the serializable input for the operation, or {@code null}
+ * @param options additional options for the signal, or {@code null}
+ * @deprecated Use {@code getEntities().signalEntity(entityId, operationName, input, options)} instead.
+ */
+ @Deprecated
+ public void signalEntity(
+ EntityInstanceId entityId,
+ String operationName,
+ @Nullable Object input,
+ @Nullable SignalEntityOptions options) {
+ this.getEntities().signalEntity(entityId, operationName, input, options);
+ }
+
+ /**
+ * Fetches the metadata for a durable entity instance, excluding its state.
+ *
+ * @param entityId the entity instance ID to query
+ * @return the entity metadata, or {@code null} if the entity does not exist
+ * @deprecated Use {@code getEntities().getEntityMetadata(entityId)} instead.
+ */
+ @Deprecated
+ @Nullable
+ public EntityMetadata getEntityMetadata(EntityInstanceId entityId) {
+ return this.getEntities().getEntityMetadata(entityId);
+ }
+
+ /**
+ * Fetches the metadata for a durable entity instance, optionally including its state.
+ *
+ * @param entityId the entity instance ID to query
+ * @param includeState {@code true} to include the entity's serialized state in the result
+ * @return the entity metadata, or {@code null} if the entity does not exist
+ * @deprecated Use {@code getEntities().getEntityMetadata(entityId, includeState)} instead.
+ */
+ @Deprecated
+ @Nullable
+ public EntityMetadata getEntityMetadata(EntityInstanceId entityId, boolean includeState) {
+ return this.getEntities().getEntityMetadata(entityId, includeState);
+ }
+
+ /**
+ * Queries the durable store for entity instances matching the specified filter criteria.
+ *
+ * @param query the query filter criteria
+ * @return the query result containing matching entities and an optional continuation token
+ * @deprecated Use {@code getEntities().queryEntities(query)} instead.
+ */
+ @Deprecated
+ public EntityQueryResult queryEntities(EntityQuery query) {
+ return this.getEntities().queryEntities(query);
+ }
+
+ /**
+ * Returns an auto-paginating iterable over entity instances matching the specified filter criteria.
+ *
+ * @param query the query filter criteria
+ * @return a pageable iterable over all matching entities
+ * @deprecated Use {@code getEntities().getAllEntities(query)} instead.
+ */
+ @Deprecated
+ public EntityQueryPageable getAllEntities(EntityQuery query) {
+ return this.getEntities().getAllEntities(query);
+ }
+
+ /**
+ * Returns an auto-paginating iterable over all entity instances.
+ *
+ * @return a pageable iterable over all entities
+ * @deprecated Use {@code getEntities().getAllEntities()} instead.
+ */
+ @Deprecated
+ public EntityQueryPageable getAllEntities() {
+ return this.getEntities().getAllEntities();
+ }
+
+ /**
+ * Cleans up entity storage by removing empty entities and/or releasing orphaned locks.
+ *
+ * @param request the clean storage request specifying what to clean
+ * @return the result of the clean operation, including counts of removed entities and released locks
+ * @deprecated Use {@code getEntities().cleanEntityStorage(request)} instead.
+ */
+ @Deprecated
+ public CleanEntityStorageResult cleanEntityStorage(CleanEntityStorageRequest request) {
+ return this.getEntities().cleanEntityStorage(request);
+ }
+
+ // endregion
}
\ No newline at end of file
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java
index 14955120..8778252a 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java
@@ -32,6 +32,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
private final ManagedChannel managedSidecarChannel;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
private final String defaultVersion;
+ private final GrpcDurableEntityClient entityClient;
DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
@@ -58,6 +59,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
}
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
+ this.entityClient = new GrpcDurableEntityClient("GrpcDurableEntityClient", this.sidecarClient, this.dataConverter);
}
DurableTaskGrpcClient(int port, String defaultVersion) {
@@ -70,6 +72,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
.usePlaintext()
.build();
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(this.managedSidecarChannel);
+ this.entityClient = new GrpcDurableEntityClient("GrpcDurableEntityClient", this.sidecarClient, this.dataConverter);
}
/**
@@ -399,4 +402,13 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
return new PurgeResult(response.getDeletedInstanceCount());
}
+
+ // region Entity APIs
+
+ @Override
+ public DurableEntityClient getEntities() {
+ return this.entityClient;
+ }
+
+ // endregion
}
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
index 284b7090..b2484feb 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
@@ -19,6 +19,8 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -34,17 +36,22 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final HashMap orchestrationFactories = new HashMap<>();
private final HashMap activityFactories = new HashMap<>();
+ private final HashMap entityFactories = new HashMap<>();
private final ManagedChannel managedSidecarChannel;
private final DataConverter dataConverter;
private final Duration maximumTimerInterval;
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
+ private final int maxConcurrentEntityWorkItems;
+ private final ExecutorService workItemExecutor;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories.putAll(builder.orchestrationFactories);
this.activityFactories.putAll(builder.activityFactories);
+ this.entityFactories.putAll(builder.entityFactories);
+ this.maxConcurrentEntityWorkItems = builder.maxConcurrentEntityWorkItems;
Channel sidecarGrpcChannel;
if (builder.channel != null) {
@@ -70,6 +77,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.versioningOptions = builder.versioningOptions;
+ this.workItemExecutor = Executors.newCachedThreadPool(r -> {
+ Thread t = new Thread(r, "durabletask-worker");
+ t.setDaemon(true);
+ return t;
+ });
}
/**
@@ -90,6 +102,15 @@ public void start() {
* configured.
*/
public void close() {
+ this.workItemExecutor.shutdown();
+ try {
+ if (!this.workItemExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ this.workItemExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ this.workItemExecutor.shutdownNow();
+ }
+
if (this.managedSidecarChannel != null) {
try {
this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
@@ -123,16 +144,26 @@ public void startAndBlock() {
this.dataConverter,
this.maximumTimerInterval,
logger,
- this.versioningOptions);
+ this.versioningOptions,
+ true);
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
this.activityFactories,
this.dataConverter,
logger);
+ TaskEntityExecutor taskEntityExecutor = new TaskEntityExecutor(
+ this.entityFactories,
+ this.dataConverter,
+ logger);
// TODO: How do we interrupt manually?
while (true) {
try {
- GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
+ GetWorkItemsRequest.Builder requestBuilder = GetWorkItemsRequest.newBuilder();
+ if (!this.entityFactories.isEmpty()) {
+ // Signal to the sidecar that this worker can handle entity work items
+ requestBuilder.setMaxConcurrentEntityWorkItems(this.maxConcurrentEntityWorkItems);
+ }
+ GetWorkItemsRequest getWorkItemsRequest = requestBuilder.build();
Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
@@ -340,42 +371,142 @@ public void startAndBlock() {
spanAttributes);
Scope activityScope = activitySpan.makeCurrent();
- // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
- String output = null;
- TaskFailureDetails failureDetails = null;
- Throwable activityError = null;
- try {
- output = taskActivityExecutor.execute(
- activityRequest.getName(),
- activityRequest.getInput().getValue(),
- activityRequest.getTaskId());
- } catch (Throwable e) {
- activityError = e;
- failureDetails = TaskFailureDetails.newBuilder()
- .setErrorType(e.getClass().getName())
- .setErrorMessage(e.getMessage())
- .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
- .build();
- } finally {
- activityScope.close();
- TracingHelper.endSpan(activitySpan, activityError);
- }
+ this.workItemExecutor.submit(() -> {
+ String output = null;
+ TaskFailureDetails failureDetails = null;
+ Throwable activityError = null;
+ try {
+ output = taskActivityExecutor.execute(
+ activityRequest.getName(),
+ activityRequest.getInput().getValue(),
+ activityRequest.getTaskId());
+ } catch (Throwable e) {
+ activityError = e;
+ failureDetails = TaskFailureDetails.newBuilder()
+ .setErrorType(e.getClass().getName())
+ .setErrorMessage(e.getMessage())
+ .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
+ .build();
+ } finally {
+ activityScope.close();
+ TracingHelper.endSpan(activitySpan, activityError);
+ }
- ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
- .setInstanceId(activityInstanceId)
- .setTaskId(activityRequest.getTaskId())
- .setCompletionToken(workItem.getCompletionToken());
+ ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
+ .setInstanceId(activityInstanceId)
+ .setTaskId(activityRequest.getTaskId())
+ .setCompletionToken(workItem.getCompletionToken());
- if (output != null) {
- responseBuilder.setResult(StringValue.of(output));
- }
+ if (output != null) {
+ responseBuilder.setResult(StringValue.of(output));
+ }
- if (failureDetails != null) {
- responseBuilder.setFailureDetails(failureDetails);
- }
+ if (failureDetails != null) {
+ responseBuilder.setFailureDetails(failureDetails);
+ }
+
+ this.sidecarClient.completeActivityTask(responseBuilder.build());
+ });
+ } else if (requestType == RequestCase.ENTITYREQUEST) {
+ EntityBatchRequest entityRequest = workItem.getEntityRequest();
+ this.workItemExecutor.submit(() -> {
+ try {
+ EntityBatchResult result = taskEntityExecutor.execute(entityRequest);
+ EntityBatchResult responseWithToken = result.toBuilder()
+ .setCompletionToken(workItem.getCompletionToken())
+ .build();
+ this.sidecarClient.completeEntityTask(responseWithToken);
+ } catch (Exception e) {
+ logger.log(Level.WARNING,
+ String.format("Failed to execute entity batch for '%s'. Abandoning work item.",
+ entityRequest.getInstanceId()),
+ e);
+ this.sidecarClient.abandonTaskEntityWorkItem(
+ AbandonEntityTaskRequest.newBuilder()
+ .setCompletionToken(workItem.getCompletionToken())
+ .build());
+ }
+ });
+ } else if (requestType == RequestCase.ENTITYREQUESTV2) {
+ EntityRequest entityRequestV2 = workItem.getEntityRequestV2();
+ this.workItemExecutor.submit(() -> {
+ try {
+ // Convert V2 (history-based) format to V1 (flat) format
+ EntityBatchRequest.Builder batchBuilder = EntityBatchRequest.newBuilder()
+ .setInstanceId(entityRequestV2.getInstanceId());
+ if (entityRequestV2.hasEntityState()) {
+ batchBuilder.setEntityState(entityRequestV2.getEntityState());
+ }
- this.sidecarClient.completeActivityTask(responseBuilder.build());
- }
+ List operationInfos = new ArrayList<>();
+ for (HistoryEvent event : entityRequestV2.getOperationRequestsList()) {
+ if (event.hasEntityOperationSignaled()) {
+ EntityOperationSignaledEvent signaled = event.getEntityOperationSignaled();
+ OperationRequest.Builder opBuilder = OperationRequest.newBuilder()
+ .setRequestId(signaled.getRequestId())
+ .setOperation(signaled.getOperation());
+ if (signaled.hasInput()) {
+ opBuilder.setInput(signaled.getInput());
+ }
+ batchBuilder.addOperations(opBuilder.build());
+ // Fire-and-forget: no response destination
+ operationInfos.add(OperationInfo.newBuilder()
+ .setRequestId(signaled.getRequestId())
+ .build());
+ } else if (event.hasEntityOperationCalled()) {
+ EntityOperationCalledEvent called = event.getEntityOperationCalled();
+ OperationRequest.Builder opBuilder = OperationRequest.newBuilder()
+ .setRequestId(called.getRequestId())
+ .setOperation(called.getOperation());
+ if (called.hasInput()) {
+ opBuilder.setInput(called.getInput());
+ }
+ batchBuilder.addOperations(opBuilder.build());
+ // Two-way call: include response destination
+ OperationInfo.Builder infoBuilder = OperationInfo.newBuilder()
+ .setRequestId(called.getRequestId());
+ if (called.hasParentInstanceId()) {
+ OrchestrationInstance.Builder destBuilder = OrchestrationInstance.newBuilder()
+ .setInstanceId(called.getParentInstanceId().getValue());
+ if (called.hasParentExecutionId()) {
+ destBuilder.setExecutionId(StringValue.of(called.getParentExecutionId().getValue()));
+ }
+ infoBuilder.setResponseDestination(destBuilder.build());
+ }
+ operationInfos.add(infoBuilder.build());
+ } else {
+ logger.log(Level.WARNING,
+ "Skipping unsupported history event type in ENTITYREQUESTV2: {0}",
+ event.getEventTypeCase());
+ }
+ }
+
+ EntityBatchRequest batchRequest = batchBuilder.build();
+ EntityBatchResult result = taskEntityExecutor.execute(batchRequest);
+
+ // Attach completion token and operation infos for response routing
+ EntityBatchResult.Builder responseBuilder = result.toBuilder()
+ .setCompletionToken(workItem.getCompletionToken());
+ // Trim operationInfos to match actual result count
+ int resultCount = result.getResultsCount();
+ if (operationInfos.size() > resultCount) {
+ responseBuilder.addAllOperationInfos(operationInfos.subList(0, resultCount));
+ } else {
+ responseBuilder.addAllOperationInfos(operationInfos);
+ }
+ this.sidecarClient.completeEntityTask(responseBuilder.build());
+ } catch (Exception e) {
+ logger.log(Level.WARNING,
+ String.format("Failed to execute V2 entity batch for '%s'. Abandoning work item.",
+ entityRequestV2.getInstanceId()),
+ e);
+ this.sidecarClient.abandonTaskEntityWorkItem(
+ AbandonEntityTaskRequest.newBuilder()
+ .setCompletionToken(workItem.getCompletionToken())
+ .build());
+ }
+ });
+ }
else if (requestType == RequestCase.HEALTHPING)
{
// No-op
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java
index ec39fee2..c0d7d9df 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java
@@ -4,8 +4,10 @@
import io.grpc.Channel;
+import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.HashMap;
+import java.util.Locale;
/**
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
@@ -13,11 +15,13 @@
public final class DurableTaskGrpcWorkerBuilder {
final HashMap orchestrationFactories = new HashMap<>();
final HashMap activityFactories = new HashMap<>();
+ final HashMap entityFactories = new HashMap<>();
int port;
Channel channel;
DataConverter dataConverter;
Duration maximumTimerInterval;
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
+ int maxConcurrentEntityWorkItems = 1;
/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
@@ -62,6 +66,127 @@ public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) {
return this;
}
+ /**
+ * Adds an entity factory to be used by the constructed {@link DurableTaskGrpcWorker}.
+ *
+ * @param name the name of the entity type
+ * @param factory the factory that creates instances of the entity
+ * @return this builder object
+ */
+ public DurableTaskGrpcWorkerBuilder addEntity(String name, TaskEntityFactory factory) {
+ if (name == null || name.isEmpty()) {
+ throw new IllegalArgumentException("A non-empty entity name is required.");
+ }
+ if (factory == null) {
+ throw new IllegalArgumentException("An entity factory is required.");
+ }
+
+ String key = name.toLowerCase(Locale.ROOT);
+ if (this.entityFactories.containsKey(key)) {
+ throw new IllegalArgumentException(
+ String.format("An entity factory named %s is already registered.", name));
+ }
+
+ this.entityFactories.put(key, factory);
+ return this;
+ }
+
+ /**
+ * Registers an entity type for the constructed {@link DurableTaskGrpcWorker}.
+ *
+ * The entity class must implement {@link ITaskEntity} and have a public no-argument constructor.
+ * A new instance of the entity is created for each operation batch using reflection.
+ *
+ * The entity name is derived from the simple class name of the provided type.
+ *
+ * @param entityClass the entity class to register; must implement {@link ITaskEntity}
+ * @return this builder object
+ * @throws IllegalArgumentException if the class does not implement {@link ITaskEntity}
+ */
+ public DurableTaskGrpcWorkerBuilder addEntity(Class extends ITaskEntity> entityClass) {
+ if (entityClass == null) {
+ throw new IllegalArgumentException("entityClass must not be null.");
+ }
+ String name = entityClass.getSimpleName();
+ return this.addEntity(name, entityClass);
+ }
+
+ /**
+ * Registers an entity type with a specific name for the constructed {@link DurableTaskGrpcWorker}.
+ *
+ * The entity class must implement {@link ITaskEntity} and have a public no-argument constructor.
+ * A new instance of the entity is created for each operation batch using reflection.
+ *
+ * @param name the name of the entity type
+ * @param entityClass the entity class to register; must implement {@link ITaskEntity}
+ * @return this builder object
+ * @throws IllegalArgumentException if the class does not implement {@link ITaskEntity}
+ */
+ public DurableTaskGrpcWorkerBuilder addEntity(String name, Class extends ITaskEntity> entityClass) {
+ if (entityClass == null) {
+ throw new IllegalArgumentException("entityClass must not be null.");
+ }
+ if (!ITaskEntity.class.isAssignableFrom(entityClass)) {
+ throw new IllegalArgumentException(
+ String.format("Type %s does not implement ITaskEntity.", entityClass.getName()));
+ }
+ return this.addEntity(name, () -> {
+ try {
+ return entityClass.getDeclaredConstructor().newInstance();
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new RuntimeException(
+ String.format("Failed to create instance of entity type %s. Ensure it has a public no-argument constructor.", entityClass.getName()), e);
+ }
+ });
+ }
+
+ /**
+ * Registers an entity singleton for the constructed {@link DurableTaskGrpcWorker}.
+ *
+ * The same entity instance is reused for every operation batch. This is useful for stateless entities
+ * or entities that manage their own lifecycle.
+ *
+ * The entity name is derived from the simple class name of the provided entity instance.
+ *
+ * Thread safety warning: Because the same instance handles all operation batches,
+ * the entity implementation must be thread-safe if concurrent entity work items are enabled.
+ * Implementations that extend {@link TaskEntity} store mutable state in instance fields and
+ * are not safe for singleton registration. Use {@link #addEntity(Class)} or
+ * {@link #addEntity(String, Class)} instead to create a new instance per batch.
+ *
+ * @param entity the entity instance to register
+ * @return this builder object
+ */
+ public DurableTaskGrpcWorkerBuilder addEntity(ITaskEntity entity) {
+ if (entity == null) {
+ throw new IllegalArgumentException("entity must not be null.");
+ }
+ String name = entity.getClass().getSimpleName();
+ return this.addEntity(name, () -> entity);
+ }
+
+ /**
+ * Registers an entity singleton with a specific name for the constructed {@link DurableTaskGrpcWorker}.
+ *
+ * The same entity instance is reused for every operation batch.
+ *
+ * Thread safety warning: Because the same instance handles all operation batches,
+ * the entity implementation must be thread-safe if concurrent entity work items are enabled.
+ * Implementations that extend {@link TaskEntity} store mutable state in instance fields and
+ * are not safe for singleton registration. Use {@link #addEntity(String, Class)} instead
+ * to create a new instance per batch.
+ *
+ * @param name the name of the entity type
+ * @param entity the entity instance to register
+ * @return this builder object
+ */
+ public DurableTaskGrpcWorkerBuilder addEntity(String name, ITaskEntity entity) {
+ if (entity == null) {
+ throw new IllegalArgumentException("entity must not be null.");
+ }
+ return this.addEntity(name, () -> entity);
+ }
+
/**
* Sets the gRPC channel to use for communicating with the sidecar process.
*
@@ -114,6 +239,24 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn
return this;
}
+ /**
+ * Sets the maximum number of entity work items that can be processed concurrently by this worker.
+ *
+ * Each entity instance is always single-threaded (serial execution), but this setting controls
+ * how many different entity instances can process work items in parallel. The default value is 1.
+ *
+ * @param maxConcurrentEntityWorkItems the maximum number of concurrent entity work items (must be at least 1)
+ * @return this builder object
+ * @throws IllegalArgumentException if the value is less than 1
+ */
+ public DurableTaskGrpcWorkerBuilder maxConcurrentEntityWorkItems(int maxConcurrentEntityWorkItems) {
+ if (maxConcurrentEntityWorkItems < 1) {
+ throw new IllegalArgumentException("maxConcurrentEntityWorkItems must be at least 1.");
+ }
+ this.maxConcurrentEntityWorkItems = maxConcurrentEntityWorkItems;
+ return this;
+ }
+
/**
* Sets the versioning options for this worker.
*
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java b/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java
new file mode 100644
index 00000000..ad8153d8
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java
@@ -0,0 +1,119 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nonnull;
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * Immutable identifier for a durable entity instance, consisting of a name and key pair.
+ *
+ * The name typically corresponds to the entity class/type name, and the key identifies the specific
+ * entity instance (e.g., a user ID or account number).
+ */
+public final class EntityInstanceId implements Comparable {
+ private final String name;
+ private final String key;
+
+ /**
+ * Creates a new {@code EntityInstanceId} with the specified name and key.
+ *
+ * @param name the entity name (type), must not be null
+ * @param key the entity key (instance identifier), must not be null
+ * @throws IllegalArgumentException if name or key is null or empty
+ */
+ public EntityInstanceId(@Nonnull String name, @Nonnull String key) {
+ if (name == null || name.isEmpty()) {
+ throw new IllegalArgumentException("Entity name must not be null or empty.");
+ }
+ if (name.contains("@")) {
+ throw new IllegalArgumentException("Entity name must not contain '@'.");
+ }
+ if (key == null || key.isEmpty()) {
+ throw new IllegalArgumentException("Entity key must not be null or empty.");
+ }
+ this.name = name.toLowerCase(Locale.ROOT);
+ this.key = key;
+ }
+
+ /**
+ * Gets the entity name (type).
+ *
+ * @return the entity name
+ */
+ @Nonnull
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * Gets the entity key (instance identifier).
+ *
+ * @return the entity key
+ */
+ @Nonnull
+ public String getKey() {
+ return this.key;
+ }
+
+ /**
+ * Parses an {@code EntityInstanceId} from its string representation.
+ *
+ * The expected format is {@code @{name}@{key}}, matching the .NET {@code EntityId.ToString()} format.
+ *
+ * @param value the string to parse
+ * @return the parsed {@code EntityInstanceId}
+ * @throws IllegalArgumentException if the string is not in the expected format
+ */
+ @Nonnull
+ public static EntityInstanceId fromString(@Nonnull String value) {
+ if (value == null || value.isEmpty()) {
+ throw new IllegalArgumentException("Value must not be null or empty.");
+ }
+ if (!value.startsWith("@")) {
+ throw new IllegalArgumentException(
+ "Invalid EntityInstanceId format. Expected '@{name}@{key}', got: " + value);
+ }
+ int secondAt = value.indexOf('@', 1);
+ if (secondAt < 0) {
+ throw new IllegalArgumentException(
+ "Invalid EntityInstanceId format. Expected '@{name}@{key}', got: " + value);
+ }
+ String name = value.substring(1, secondAt);
+ String key = value.substring(secondAt + 1);
+ return new EntityInstanceId(name, key);
+ }
+
+ /**
+ * Returns the string representation in the format {@code @{name}@{key}}.
+ *
+ * @return the string representation of this entity instance ID
+ */
+ @Override
+ public String toString() {
+ return "@" + this.name + "@" + this.key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ EntityInstanceId that = (EntityInstanceId) o;
+ return this.name.equals(that.name) && this.key.equals(that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.name, this.key);
+ }
+
+ @Override
+ public int compareTo(@Nonnull EntityInstanceId other) {
+ int nameCompare = this.name.compareTo(other.name);
+ if (nameCompare != 0) {
+ return nameCompare;
+ }
+ return this.key.compareTo(other.key);
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java b/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java
new file mode 100644
index 00000000..9ae6b995
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java
@@ -0,0 +1,152 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.time.Instant;
+
+/**
+ * Represents metadata about a durable entity instance, including its identity, state, and lock status.
+ *
+ * For typed state access, see {@link TypedEntityMetadata} which provides a {@code getState()} method
+ * that returns the deserialized state as a specific type.
+ *
+ * @see TypedEntityMetadata
+ */
+public class EntityMetadata {
+ private final String instanceId;
+ private final Instant lastModifiedTime;
+ private final int backlogQueueSize;
+ private final String lockedBy;
+ private final String serializedState;
+ private final boolean includesState;
+ private final DataConverter dataConverter;
+ private EntityInstanceId cachedEntityInstanceId;
+
+ /**
+ * Creates a new {@code EntityMetadata} instance.
+ *
+ * @param instanceId the entity instance ID string (in {@code @name@key} format)
+ * @param lastModifiedTime the time the entity was last modified
+ * @param backlogQueueSize the number of operations waiting in the entity's backlog queue
+ * @param lockedBy the orchestration instance ID that currently holds a lock on this entity, or {@code null}
+ * @param serializedState the serialized entity state, or {@code null} if state was not fetched
+ * @param includesState {@code true} if the state was requested and is included in this metadata
+ * @param dataConverter the data converter used to deserialize state
+ */
+ EntityMetadata(
+ String instanceId,
+ Instant lastModifiedTime,
+ int backlogQueueSize,
+ @Nullable String lockedBy,
+ @Nullable String serializedState,
+ boolean includesState,
+ DataConverter dataConverter) {
+ this.instanceId = instanceId;
+ this.lastModifiedTime = lastModifiedTime;
+ this.backlogQueueSize = backlogQueueSize;
+ this.lockedBy = lockedBy;
+ this.serializedState = serializedState;
+ this.includesState = includesState;
+ this.dataConverter = dataConverter;
+ }
+
+ /**
+ * Gets the entity instance ID string.
+ *
+ * @return the instance ID
+ */
+ public String getInstanceId() {
+ return this.instanceId;
+ }
+
+ /**
+ * Gets the parsed {@link EntityInstanceId} from the instance ID string.
+ *
+ * @return the parsed entity instance ID
+ */
+ public EntityInstanceId getEntityInstanceId() {
+ if (this.cachedEntityInstanceId == null) {
+ this.cachedEntityInstanceId = EntityInstanceId.fromString(this.instanceId);
+ }
+ return this.cachedEntityInstanceId;
+ }
+
+ /**
+ * Gets the time the entity was last modified.
+ *
+ * @return the last modified time
+ */
+ public Instant getLastModifiedTime() {
+ return this.lastModifiedTime;
+ }
+
+ /**
+ * Gets the number of operations waiting in the entity's backlog queue.
+ *
+ * @return the backlog queue size
+ */
+ public int getBacklogQueueSize() {
+ return this.backlogQueueSize;
+ }
+
+ /**
+ * Gets the orchestration instance ID that currently holds a lock on this entity,
+ * or {@code null} if the entity is not locked.
+ *
+ * @return the locking orchestration instance ID, or {@code null}
+ */
+ @Nullable
+ public String getLockedBy() {
+ return this.lockedBy;
+ }
+
+ /**
+ * Gets the raw serialized entity state, or {@code null} if state was not fetched.
+ *
+ * @return the serialized state string, or {@code null}
+ */
+ @Nullable
+ public String getSerializedState() {
+ return this.serializedState;
+ }
+
+ /**
+ * Gets whether this metadata response includes the entity state.
+ *
+ * Queries can exclude the state of the entity from the metadata that is retrieved.
+ * When this returns {@code false}, {@link #getSerializedState()} and {@link #readStateAs(Class)}
+ * will return {@code null}.
+ *
+ * @return {@code true} if state was requested and included in this metadata
+ */
+ public boolean isIncludesState() {
+ return this.includesState;
+ }
+
+ /**
+ * Gets the data converter used for state deserialization.
+ *
+ * This is package-private to allow {@link TypedEntityMetadata} to pass it to the superclass constructor.
+ *
+ * @return the data converter
+ */
+ DataConverter getDataConverter() {
+ return this.dataConverter;
+ }
+
+ /**
+ * Deserializes the entity state into an object of the specified type.
+ *
+ * @param stateType the class to deserialize the state into
+ * @param the target type
+ * @return the deserialized state, or {@code null} if no state is available
+ */
+ @Nullable
+ public T readStateAs(Class stateType) {
+ if (this.serializedState == null) {
+ return null;
+ }
+ return this.dataConverter.deserialize(this.serializedState, stateType);
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityOperationFailedException.java b/client/src/main/java/com/microsoft/durabletask/EntityOperationFailedException.java
new file mode 100644
index 00000000..d34071af
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityOperationFailedException.java
@@ -0,0 +1,70 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Exception thrown when a two-way entity call fails because the entity operation threw an exception.
+ *
+ * This is analogous to {@link TaskFailedException} but specific to entity operations invoked
+ * via {@code callEntity}. The {@link #getFailureDetails()} method provides detailed information
+ * about the failure.
+ */
+public class EntityOperationFailedException extends RuntimeException {
+ private final EntityInstanceId entityId;
+ private final String operationName;
+ private final FailureDetails failureDetails;
+
+ /**
+ * Creates a new {@code EntityOperationFailedException}.
+ *
+ * @param entityId the ID of the entity that failed
+ * @param operationName the name of the operation that failed
+ * @param failureDetails the details of the failure
+ */
+ public EntityOperationFailedException(
+ @Nonnull EntityInstanceId entityId,
+ @Nonnull String operationName,
+ @Nonnull FailureDetails failureDetails) {
+ super(String.format(
+ "Entity operation '%s' on entity '%s' failed: %s",
+ operationName,
+ entityId.toString(),
+ failureDetails.getErrorMessage()));
+ this.entityId = entityId;
+ this.operationName = operationName;
+ this.failureDetails = failureDetails;
+ }
+
+ /**
+ * Gets the ID of the entity that failed.
+ *
+ * @return the entity instance ID
+ */
+ @Nonnull
+ public EntityInstanceId getEntityId() {
+ return this.entityId;
+ }
+
+ /**
+ * Gets the name of the operation that failed.
+ *
+ * @return the operation name
+ */
+ @Nonnull
+ public String getOperationName() {
+ return this.operationName;
+ }
+
+ /**
+ * Gets the failure details, including the error type, message, and stack trace.
+ *
+ * @return the failure details
+ */
+ @Nonnull
+ public FailureDetails getFailureDetails() {
+ return this.failureDetails;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityProxy.java b/client/src/main/java/com/microsoft/durabletask/EntityProxy.java
new file mode 100644
index 00000000..fb882d7e
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityProxy.java
@@ -0,0 +1,186 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nonnull;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Type;
+
+/**
+ * Creates type-safe proxies for interacting with durable entities from orchestrations.
+ *
+ * A typed entity proxy is a JDK dynamic proxy that implements a user-defined interface.
+ * Method calls on the proxy are translated into entity operations:
+ *
+ * - {@code void} methods become fire-and-forget signals via
+ * {@link TaskOrchestrationContext#signalEntity}
+ * - Methods returning {@link Task}{@code } become two-way calls via
+ * {@link TaskOrchestrationContext#callEntity}
+ *
+ *
+ * The method name is used as the entity operation name (case-insensitive matching on the
+ * entity side). Methods must accept 0 or 1 parameters; the single parameter is passed as
+ * the operation input.
+ *
+ *
Example:
+ *
{@code
+ * // Define entity operations as an interface
+ * public interface ICounter {
+ * void add(int amount); // fire-and-forget signal
+ * void reset(); // fire-and-forget signal
+ * Task get(); // two-way call returning a result
+ * }
+ *
+ * // Use in an orchestration
+ * ICounter counter = ctx.createEntityProxy(entityId, ICounter.class);
+ * counter.add(5);
+ * counter.reset();
+ * int value = counter.get().await();
+ * }
+ *
+ * @see TaskOrchestrationContext#createEntityProxy(EntityInstanceId, Class)
+ * @see TaskOrchestrationEntityFeature#createProxy(EntityInstanceId, Class)
+ */
+public final class EntityProxy {
+
+ private EntityProxy() {
+ // Utility class — not instantiable
+ }
+
+ /**
+ * Creates a typed entity proxy for the given entity instance.
+ *
+ * @param ctx the orchestration context (used to send signals and calls)
+ * @param entityId the target entity's instance ID
+ * @param proxyInterface the interface whose methods map to entity operations
+ * @param the proxy interface type
+ * @return a proxy instance that implements {@code proxyInterface}
+ * @throws IllegalArgumentException if {@code proxyInterface} is not an interface
+ */
+ @SuppressWarnings("unchecked")
+ public static T create(
+ @Nonnull TaskOrchestrationContext ctx,
+ @Nonnull EntityInstanceId entityId,
+ @Nonnull Class proxyInterface) {
+ if (ctx == null) {
+ throw new IllegalArgumentException("ctx must not be null");
+ }
+ if (entityId == null) {
+ throw new IllegalArgumentException("entityId must not be null");
+ }
+ if (proxyInterface == null) {
+ throw new IllegalArgumentException("proxyInterface must not be null");
+ }
+ if (!proxyInterface.isInterface()) {
+ throw new IllegalArgumentException(
+ "proxyInterface must be an interface, got: " + proxyInterface.getName());
+ }
+
+ return (T) Proxy.newProxyInstance(
+ proxyInterface.getClassLoader(),
+ new Class>[]{ proxyInterface },
+ new EntityInvocationHandler(ctx, entityId));
+ }
+
+ /**
+ * Invocation handler that translates interface method calls into entity operations.
+ */
+ private static final class EntityInvocationHandler implements InvocationHandler {
+ private final TaskOrchestrationContext ctx;
+ private final EntityInstanceId entityId;
+
+ EntityInvocationHandler(TaskOrchestrationContext ctx, EntityInstanceId entityId) {
+ this.ctx = ctx;
+ this.entityId = entityId;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ // Handle java.lang.Object methods
+ if (method.getDeclaringClass() == Object.class) {
+ switch (method.getName()) {
+ case "toString":
+ return "EntityProxy[" + entityId + "]";
+ case "hashCode":
+ return entityId.hashCode();
+ case "equals":
+ if (args == null || args.length == 0) {
+ return false;
+ }
+ if (args[0] == proxy) {
+ return true;
+ }
+ if (args[0] == null || !Proxy.isProxyClass(args[0].getClass())) {
+ return false;
+ }
+ InvocationHandler otherHandler = Proxy.getInvocationHandler(args[0]);
+ if (otherHandler instanceof EntityInvocationHandler) {
+ return entityId.equals(((EntityInvocationHandler) otherHandler).entityId);
+ }
+ return false;
+ default:
+ return method.invoke(this, args);
+ }
+ }
+
+ String operationName = method.getName();
+
+ if (args != null && args.length > 1) {
+ throw new UnsupportedOperationException(
+ "Entity proxy methods must have 0 or 1 parameters. " +
+ "Method '" + operationName + "' has " + args.length + " parameters. " +
+ "Use a single wrapper object to pass multiple values.");
+ }
+
+ Object input = (args != null && args.length == 1) ? args[0] : null;
+
+ Class> returnType = method.getReturnType();
+
+ if (returnType == void.class) {
+ // Fire-and-forget signal
+ ctx.signalEntity(entityId, operationName, input);
+ return null;
+ } else if (Task.class.isAssignableFrom(returnType)) {
+ // Two-way entity call — extract the Task type parameter
+ Class> resultType = extractTaskTypeParameter(method);
+ return ctx.callEntity(entityId, operationName, input, resultType);
+ } else {
+ throw new UnsupportedOperationException(
+ "Entity proxy methods must return void (for signals) or Task (for calls). " +
+ "Method '" + operationName + "' returns " + returnType.getName() + ".");
+ }
+ }
+
+ /**
+ * Extracts the generic type parameter from a method returning {@code Task}.
+ * Falls back to {@code Void.class} if the type cannot be determined.
+ */
+ private static Class> extractTaskTypeParameter(Method method) {
+ Type genericReturnType = method.getGenericReturnType();
+ if (genericReturnType instanceof ParameterizedType) {
+ ParameterizedType pt = (ParameterizedType) genericReturnType;
+ Type[] typeArgs = pt.getActualTypeArguments();
+ if (typeArgs.length > 0) {
+ return getRawClass(typeArgs[0]);
+ }
+ }
+ return Void.class;
+ }
+
+ /**
+ * Resolves a {@link Type} to its raw {@link Class}, handling parameterized types
+ * and wildcard types.
+ */
+ private static Class> getRawClass(Type type) {
+ if (type instanceof Class) {
+ return (Class>) type;
+ } else if (type instanceof ParameterizedType) {
+ return getRawClass(((ParameterizedType) type).getRawType());
+ }
+ return Object.class;
+ }
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityQuery.java b/client/src/main/java/com/microsoft/durabletask/EntityQuery.java
new file mode 100644
index 00000000..0cf652fb
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityQuery.java
@@ -0,0 +1,218 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.time.Instant;
+import java.util.Locale;
+
+/**
+ * Represents a query filter for fetching durable entity metadata from the store.
+ *
+ * Use the builder-style setters to configure the query parameters, then pass this object to
+ * {@link DurableEntityClient#queryEntities(EntityQuery)}.
+ */
+public final class EntityQuery {
+
+ /**
+ * The default page size for entity queries ({@value}).
+ * This matches the .NET SDK's {@code EntityQuery.DefaultPageSize}.
+ */
+ public static final int DEFAULT_PAGE_SIZE = 100;
+
+ private String instanceIdStartsWith;
+ private Instant lastModifiedFrom;
+ private Instant lastModifiedTo;
+ private boolean includeState = true;
+ private boolean includeTransient;
+ private Integer pageSize;
+ private String continuationToken;
+
+ /**
+ * Creates a new {@code EntityQuery} with default settings.
+ */
+ public EntityQuery() {
+ }
+
+ /**
+ * Sets a prefix filter on entity instance IDs.
+ *
+ * If the value does not start with {@code @}, it is treated as a raw entity name and will be
+ * normalized to the internal format {@code @entityname} (lowercased) to match how entity
+ * instance IDs are stored.
+ *
+ * @param instanceIdStartsWith the instance ID prefix to filter by, or {@code null} for no filter
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setInstanceIdStartsWith(@Nullable String instanceIdStartsWith) {
+ if (instanceIdStartsWith != null && !instanceIdStartsWith.startsWith("@")) {
+ // Normalize: treat as raw entity name, prepend '@' and lowercase
+ instanceIdStartsWith = "@" + instanceIdStartsWith.toLowerCase(Locale.ROOT);
+ } else if (instanceIdStartsWith != null) {
+ // Already in @name format, but ensure the entity name portion is lowercased
+ // Format is @entityName or @entityName@key
+ int secondAt = instanceIdStartsWith.indexOf('@', 1);
+ if (secondAt > 0) {
+ // Has key part: lowercase only the name between first and second @
+ instanceIdStartsWith = "@" + instanceIdStartsWith.substring(1, secondAt).toLowerCase(Locale.ROOT)
+ + instanceIdStartsWith.substring(secondAt);
+ } else {
+ // Only @name, lowercase the name portion
+ instanceIdStartsWith = "@" + instanceIdStartsWith.substring(1).toLowerCase(Locale.ROOT);
+ }
+ }
+ this.instanceIdStartsWith = instanceIdStartsWith;
+ return this;
+ }
+
+ /**
+ * Gets the instance ID prefix filter.
+ *
+ * @return the instance ID prefix, or {@code null}
+ */
+ @Nullable
+ public String getInstanceIdStartsWith() {
+ return this.instanceIdStartsWith;
+ }
+
+ /**
+ * Sets the minimum last-modified time filter (inclusive).
+ *
+ * @param lastModifiedFrom the minimum last-modified time, or {@code null} for no lower bound
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setLastModifiedFrom(@Nullable Instant lastModifiedFrom) {
+ this.lastModifiedFrom = lastModifiedFrom;
+ return this;
+ }
+
+ /**
+ * Gets the minimum last-modified time filter.
+ *
+ * @return the minimum last-modified time, or {@code null}
+ */
+ @Nullable
+ public Instant getLastModifiedFrom() {
+ return this.lastModifiedFrom;
+ }
+
+ /**
+ * Sets the maximum last-modified time filter (inclusive).
+ *
+ * @param lastModifiedTo the maximum last-modified time, or {@code null} for no upper bound
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setLastModifiedTo(@Nullable Instant lastModifiedTo) {
+ this.lastModifiedTo = lastModifiedTo;
+ return this;
+ }
+
+ /**
+ * Gets the maximum last-modified time filter.
+ *
+ * @return the maximum last-modified time, or {@code null}
+ */
+ @Nullable
+ public Instant getLastModifiedTo() {
+ return this.lastModifiedTo;
+ }
+
+ /**
+ * Sets whether to include entity state in the query results.
+ *
+ * @param includeState {@code true} to include state, {@code false} to omit it
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setIncludeState(boolean includeState) {
+ this.includeState = includeState;
+ return this;
+ }
+
+ /**
+ * Gets whether entity state is included in query results.
+ *
+ * @return {@code true} if state is included
+ */
+ public boolean isIncludeState() {
+ return this.includeState;
+ }
+
+ /**
+ * Sets whether to include transient (not yet persisted) entities in the results.
+ *
+ * @param includeTransient {@code true} to include transient entities
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setIncludeTransient(boolean includeTransient) {
+ this.includeTransient = includeTransient;
+ return this;
+ }
+
+ /**
+ * Gets whether transient entities are included in query results.
+ *
+ * @return {@code true} if transient entities are included
+ */
+ public boolean isIncludeTransient() {
+ return this.includeTransient;
+ }
+
+ /**
+ * Sets the maximum number of results to return per page.
+ *
+ * @param pageSize the page size, or {@code null} for the server default
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setPageSize(@Nullable Integer pageSize) {
+ this.pageSize = pageSize;
+ return this;
+ }
+
+ /**
+ * Gets the maximum number of results per page.
+ *
+ * @return the page size, or {@code null} for the server default
+ */
+ @Nullable
+ public Integer getPageSize() {
+ return this.pageSize;
+ }
+
+ /**
+ * Sets the continuation token for fetching the next page of results.
+ *
+ * @param continuationToken the continuation token from a previous query, or {@code null} to start from the beginning
+ * @return this {@code EntityQuery} for chaining
+ */
+ public EntityQuery setContinuationToken(@Nullable String continuationToken) {
+ this.continuationToken = continuationToken;
+ return this;
+ }
+
+ /**
+ * Gets the continuation token for pagination.
+ *
+ * @return the continuation token, or {@code null}
+ */
+ @Nullable
+ public String getContinuationToken() {
+ return this.continuationToken;
+ }
+
+ /**
+ * Creates a shallow copy of this query.
+ *
+ * @return a new {@code EntityQuery} with the same field values
+ */
+ public EntityQuery copy() {
+ EntityQuery copy = new EntityQuery();
+ copy.instanceIdStartsWith = this.instanceIdStartsWith;
+ copy.lastModifiedFrom = this.lastModifiedFrom;
+ copy.lastModifiedTo = this.lastModifiedTo;
+ copy.includeState = this.includeState;
+ copy.includeTransient = this.includeTransient;
+ copy.pageSize = this.pageSize;
+ copy.continuationToken = this.continuationToken;
+ return copy;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityQueryPageable.java b/client/src/main/java/com/microsoft/durabletask/EntityQueryPageable.java
new file mode 100644
index 00000000..b21e64b7
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityQueryPageable.java
@@ -0,0 +1,180 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+
+/**
+ * An auto-paginating iterable over entity query results.
+ *
+ * This class automatically handles pagination when iterating over entity metadata results.
+ * It fetches pages from the store on demand and yields individual {@link EntityMetadata}
+ * items to the caller.
+ *
+ * Use {@link DurableEntityClient#getAllEntities(EntityQuery)} to obtain an instance of this class.
+ *
+ *
Example: iterate over all entities
+ * {@code
+ * EntityQuery query = new EntityQuery()
+ * .setInstanceIdStartsWith("counter")
+ * .setIncludeState(true);
+ *
+ * for (EntityMetadata entity : client.getEntities().getAllEntities(query)) {
+ * System.out.println(entity.getEntityInstanceId());
+ * }
+ * }
+ *
+ * Example: iterate page by page
+ * {@code
+ * for (EntityQueryResult page : client.getEntities().getAllEntities(query).byPage()) {
+ * System.out.println("Got " + page.getEntities().size() + " entities");
+ * for (EntityMetadata entity : page.getEntities()) {
+ * System.out.println(entity.getEntityInstanceId());
+ * }
+ * }
+ * }
+ */
+public final class EntityQueryPageable implements Iterable {
+ private final EntityQuery baseQuery;
+ private final Function queryExecutor;
+
+ /**
+ * Creates a new {@code EntityQueryPageable}.
+ *
+ * @param baseQuery the base query parameters
+ * @param queryExecutor the function that executes a single page query
+ */
+ EntityQueryPageable(EntityQuery baseQuery, Function queryExecutor) {
+ this.baseQuery = baseQuery;
+ this.queryExecutor = queryExecutor;
+ }
+
+ /**
+ * Returns an iterator over individual {@link EntityMetadata} items, automatically
+ * fetching subsequent pages as needed.
+ *
+ * @return an iterator over all matching entities
+ */
+ @Override
+ public Iterator iterator() {
+ return new EntityItemIterator();
+ }
+
+ /**
+ * Returns an iterable over pages of results, where each page is an {@link EntityQueryResult}
+ * containing a list of entities and an optional continuation token.
+ *
+ * @return an iterable over result pages
+ */
+ public Iterable byPage() {
+ return PageIterable::new;
+ }
+
+ private class EntityItemIterator implements Iterator {
+ private String continuationToken = baseQuery.getContinuationToken();
+ private Iterator currentPageIterator;
+ private boolean finished;
+
+ EntityItemIterator() {
+ fetchNextPage();
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (currentPageIterator != null && currentPageIterator.hasNext()) {
+ return true;
+ }
+ if (finished) {
+ return false;
+ }
+ fetchNextPage();
+ }
+ }
+
+ @Override
+ public EntityMetadata next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return currentPageIterator.next();
+ }
+
+ private void fetchNextPage() {
+ if (finished) {
+ return;
+ }
+
+ EntityQuery pageQuery = cloneQuery(baseQuery);
+ pageQuery.setContinuationToken(continuationToken);
+
+ EntityQueryResult result = queryExecutor.apply(pageQuery);
+ List entities = result.getEntities();
+
+ if (entities == null || entities.isEmpty()) {
+ finished = true;
+ currentPageIterator = null;
+ return;
+ }
+
+ currentPageIterator = entities.iterator();
+ continuationToken = result.getContinuationToken();
+
+ if (continuationToken == null || continuationToken.isEmpty()) {
+ finished = true;
+ }
+ }
+ }
+
+ private class PageIterable implements Iterator {
+ private String continuationToken = baseQuery.getContinuationToken();
+ private boolean finished;
+ private boolean firstPage = true;
+
+ @Override
+ public boolean hasNext() {
+ return !finished;
+ }
+
+ @Override
+ public EntityQueryResult next() {
+ if (finished) {
+ throw new NoSuchElementException();
+ }
+
+ EntityQuery pageQuery = cloneQuery(baseQuery);
+ if (!firstPage) {
+ pageQuery.setContinuationToken(continuationToken);
+ }
+ firstPage = false;
+
+ EntityQueryResult result = queryExecutor.apply(pageQuery);
+ continuationToken = result.getContinuationToken();
+
+ if (continuationToken == null || continuationToken.isEmpty()) {
+ finished = true;
+ }
+
+ return result;
+ }
+ }
+
+ private static EntityQuery cloneQuery(EntityQuery source) {
+ EntityQuery clone = new EntityQuery();
+ if (source.getInstanceIdStartsWith() != null) {
+ // Use raw setter value since the source is already normalized
+ clone.setInstanceIdStartsWith(source.getInstanceIdStartsWith());
+ }
+ clone.setLastModifiedFrom(source.getLastModifiedFrom());
+ clone.setLastModifiedTo(source.getLastModifiedTo());
+ clone.setIncludeState(source.isIncludeState());
+ clone.setIncludeTransient(source.isIncludeTransient());
+ clone.setPageSize(source.getPageSize());
+ clone.setContinuationToken(source.getContinuationToken());
+ return clone;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityQueryResult.java b/client/src/main/java/com/microsoft/durabletask/EntityQueryResult.java
new file mode 100644
index 00000000..cac22cc7
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityQueryResult.java
@@ -0,0 +1,45 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Represents the result of an entity query operation, including the matching entities and
+ * an optional continuation token for pagination.
+ */
+public final class EntityQueryResult {
+ private final List entities;
+ private final String continuationToken;
+
+ /**
+ * Creates a new {@code EntityQueryResult}.
+ *
+ * @param entities the list of entity metadata records matching the query
+ * @param continuationToken the continuation token for fetching the next page, or {@code null} if no more results
+ */
+ EntityQueryResult(List entities, @Nullable String continuationToken) {
+ this.entities = entities;
+ this.continuationToken = continuationToken;
+ }
+
+ /**
+ * Gets the list of entity metadata records matching the query.
+ *
+ * @return the list of entity metadata
+ */
+ public List getEntities() {
+ return this.entities;
+ }
+
+ /**
+ * Gets the continuation token for fetching the next page of results.
+ *
+ * @return the continuation token, or {@code null} if there are no more results
+ */
+ @Nullable
+ public String getContinuationToken() {
+ return this.continuationToken;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/EntityRunner.java b/client/src/main/java/com/microsoft/durabletask/EntityRunner.java
new file mode 100644
index 00000000..d28c6952
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/EntityRunner.java
@@ -0,0 +1,132 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.EntityBatchRequest;
+import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.EntityBatchResult;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.logging.Logger;
+
+/**
+ * Helper class for invoking entity operations directly, without constructing a {@link DurableTaskGrpcWorker} object.
+ *
+ * This static class can be used to execute entity logic directly. In order to use it for this purpose, the
+ * caller must provide entity state as serialized protobuf bytes. This is the entity equivalent of
+ * {@link OrchestrationRunner}.
+ *
+ * Typical usage in an Azure Functions entity trigger:
+ *
+ * {@literal @}FunctionName("Counter")
+ * public String counterEntity(
+ * {@literal @}DurableEntityTrigger(name = "req") String req) {
+ * return EntityRunner.loadAndRun(req, () -> new CounterEntity());
+ * }
+ *
+ */
+public final class EntityRunner {
+ private static final Logger logger = Logger.getLogger(EntityRunner.class.getPackage().getName());
+
+ private EntityRunner() {
+ }
+
+ /**
+ * Loads an entity batch request from {@code base64EncodedEntityRequest} and uses it to execute
+ * entity operations using the entity created by {@code entityFactory}.
+ *
+ * @param base64EncodedEntityRequest the base64-encoded protobuf payload representing an entity batch request
+ * @param entityFactory a factory that creates the entity instance to handle operations
+ * @return a base64-encoded protobuf payload of the entity batch result
+ * @throws IllegalArgumentException if either parameter is {@code null} or if {@code base64EncodedEntityRequest}
+ * is not valid base64-encoded protobuf
+ */
+ public static String loadAndRun(String base64EncodedEntityRequest, TaskEntityFactory entityFactory) {
+ byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedEntityRequest);
+ byte[] resultBytes = loadAndRun(decodedBytes, entityFactory);
+ return Base64.getEncoder().encodeToString(resultBytes);
+ }
+
+ /**
+ * Loads an entity batch request from {@code entityRequestBytes} and uses it to execute
+ * entity operations using the entity created by {@code entityFactory}.
+ *
+ * @param entityRequestBytes the protobuf payload representing an entity batch request
+ * @param entityFactory a factory that creates the entity instance to handle operations
+ * @return a protobuf-encoded payload of the entity batch result
+ * @throws IllegalArgumentException if either parameter is {@code null} or if {@code entityRequestBytes}
+ * is not valid protobuf
+ */
+ public static byte[] loadAndRun(byte[] entityRequestBytes, TaskEntityFactory entityFactory) {
+ if (entityRequestBytes == null || entityRequestBytes.length == 0) {
+ throw new IllegalArgumentException("entityRequestBytes must not be null or empty");
+ }
+
+ if (entityFactory == null) {
+ throw new IllegalArgumentException("entityFactory must not be null");
+ }
+
+ EntityBatchRequest request;
+ try {
+ request = EntityBatchRequest.parseFrom(entityRequestBytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException("entityRequestBytes was not valid protobuf", e);
+ }
+
+ // Parse entity name from the instance ID so the executor can look it up
+ String instanceId = request.getInstanceId();
+ String entityName;
+ try {
+ entityName = EntityInstanceId.fromString(instanceId).getName();
+ } catch (Exception e) {
+ // Fallback: use the raw instance ID as the entity name
+ entityName = instanceId;
+ }
+
+ HashMap factories = new HashMap<>();
+ factories.put(entityName, entityFactory);
+
+ TaskEntityExecutor executor = new TaskEntityExecutor(
+ factories,
+ new JacksonDataConverter(),
+ logger);
+
+ EntityBatchResult result = executor.execute(request);
+ return result.toByteArray();
+ }
+
+ /**
+ * Loads an entity batch request from {@code base64EncodedEntityRequest} and uses it to execute
+ * entity operations using the provided {@code entity} instance.
+ *
+ * @param base64EncodedEntityRequest the base64-encoded protobuf payload representing an entity batch request
+ * @param entity the entity instance to handle operations
+ * @return a base64-encoded protobuf payload of the entity batch result
+ * @throws IllegalArgumentException if either parameter is {@code null} or if {@code base64EncodedEntityRequest}
+ * is not valid base64-encoded protobuf
+ */
+ public static String loadAndRun(String base64EncodedEntityRequest, ITaskEntity entity) {
+ if (entity == null) {
+ throw new IllegalArgumentException("entity must not be null");
+ }
+ return loadAndRun(base64EncodedEntityRequest, () -> entity);
+ }
+
+ /**
+ * Loads an entity batch request from {@code entityRequestBytes} and uses it to execute
+ * entity operations using the provided {@code entity} instance.
+ *
+ * @param entityRequestBytes the protobuf payload representing an entity batch request
+ * @param entity the entity instance to handle operations
+ * @return a protobuf-encoded payload of the entity batch result
+ * @throws IllegalArgumentException if either parameter is {@code null} or if {@code entityRequestBytes}
+ * is not valid protobuf
+ */
+ public static byte[] loadAndRun(byte[] entityRequestBytes, ITaskEntity entity) {
+ if (entity == null) {
+ throw new IllegalArgumentException("entity must not be null");
+ }
+ return loadAndRun(entityRequestBytes, () -> entity);
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/GrpcDurableEntityClient.java b/client/src/main/java/com/microsoft/durabletask/GrpcDurableEntityClient.java
new file mode 100644
index 00000000..9e92c602
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/GrpcDurableEntityClient.java
@@ -0,0 +1,175 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Timestamp;
+import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*;
+import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub;
+
+import javax.annotation.Nullable;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * gRPC-based implementation of {@link DurableEntityClient}.
+ */
+final class GrpcDurableEntityClient extends DurableEntityClient {
+
+ private final TaskHubSidecarServiceBlockingStub sidecarClient;
+ private final DataConverter dataConverter;
+
+ GrpcDurableEntityClient(
+ String name,
+ TaskHubSidecarServiceBlockingStub sidecarClient,
+ DataConverter dataConverter) {
+ super(name);
+ this.sidecarClient = sidecarClient;
+ this.dataConverter = dataConverter;
+ }
+
+ @Override
+ public void signalEntity(
+ EntityInstanceId entityId,
+ String operationName,
+ @Nullable Object input,
+ @Nullable SignalEntityOptions options) {
+ Helpers.throwIfArgumentNull(entityId, "entityId");
+ Helpers.throwIfArgumentNull(operationName, "operationName");
+
+ SignalEntityRequest.Builder builder = SignalEntityRequest.newBuilder()
+ .setInstanceId(entityId.toString())
+ .setName(operationName)
+ .setRequestId(UUID.randomUUID().toString());
+
+ if (input != null) {
+ String serializedInput = this.dataConverter.serialize(input);
+ if (serializedInput != null) {
+ builder.setInput(StringValue.of(serializedInput));
+ }
+ }
+
+ if (options != null && options.getScheduledTime() != null) {
+ Timestamp ts = DataConverter.getTimestampFromInstant(options.getScheduledTime());
+ builder.setScheduledTime(ts);
+ }
+
+ this.sidecarClient.signalEntity(builder.build());
+ }
+
+ @Override
+ @Nullable
+ public EntityMetadata getEntityMetadata(EntityInstanceId entityId, boolean includeState) {
+ Helpers.throwIfArgumentNull(entityId, "entityId");
+
+ GetEntityRequest request = GetEntityRequest.newBuilder()
+ .setInstanceId(entityId.toString())
+ .setIncludeState(includeState)
+ .build();
+
+ GetEntityResponse response = this.sidecarClient.getEntity(request);
+ if (!response.getExists()) {
+ return null;
+ }
+
+ return toEntityMetadata(response.getEntity());
+ }
+
+ @Override
+ public EntityQueryResult queryEntities(EntityQuery query) {
+ Helpers.throwIfArgumentNull(query, "query");
+
+ com.microsoft.durabletask.implementation.protobuf.OrchestratorService.EntityQuery.Builder queryBuilder =
+ com.microsoft.durabletask.implementation.protobuf.OrchestratorService.EntityQuery.newBuilder();
+
+ if (query.getInstanceIdStartsWith() != null) {
+ queryBuilder.setInstanceIdStartsWith(StringValue.of(query.getInstanceIdStartsWith()));
+ }
+ if (query.getLastModifiedFrom() != null) {
+ queryBuilder.setLastModifiedFrom(DataConverter.getTimestampFromInstant(query.getLastModifiedFrom()));
+ }
+ if (query.getLastModifiedTo() != null) {
+ queryBuilder.setLastModifiedTo(DataConverter.getTimestampFromInstant(query.getLastModifiedTo()));
+ }
+ queryBuilder.setIncludeState(query.isIncludeState());
+ queryBuilder.setIncludeTransient(query.isIncludeTransient());
+ if (query.getPageSize() != null) {
+ queryBuilder.setPageSize(com.google.protobuf.Int32Value.of(query.getPageSize()));
+ }
+ if (query.getContinuationToken() != null) {
+ queryBuilder.setContinuationToken(StringValue.of(query.getContinuationToken()));
+ }
+
+ QueryEntitiesRequest request = QueryEntitiesRequest.newBuilder()
+ .setQuery(queryBuilder)
+ .build();
+
+ QueryEntitiesResponse response = this.sidecarClient.queryEntities(request);
+
+ List entities = new ArrayList<>();
+ for (com.microsoft.durabletask.implementation.protobuf.OrchestratorService.EntityMetadata protoEntity
+ : response.getEntitiesList()) {
+ entities.add(toEntityMetadata(protoEntity));
+ }
+
+ String continuationToken = response.hasContinuationToken()
+ ? response.getContinuationToken().getValue()
+ : null;
+
+ return new EntityQueryResult(entities, continuationToken);
+ }
+
+ @Override
+ public CleanEntityStorageResult cleanEntityStorage(CleanEntityStorageRequest request) {
+ Helpers.throwIfArgumentNull(request, "request");
+
+ int totalEmptyEntitiesRemoved = 0;
+ int totalOrphanedLocksReleased = 0;
+ String continuationToken = request.getContinuationToken();
+
+ do {
+ com.microsoft.durabletask.implementation.protobuf.OrchestratorService.CleanEntityStorageRequest.Builder builder =
+ com.microsoft.durabletask.implementation.protobuf.OrchestratorService.CleanEntityStorageRequest.newBuilder()
+ .setRemoveEmptyEntities(request.isRemoveEmptyEntities())
+ .setReleaseOrphanedLocks(request.isReleaseOrphanedLocks());
+
+ if (continuationToken != null) {
+ builder.setContinuationToken(StringValue.of(continuationToken));
+ }
+
+ CleanEntityStorageResponse response = this.sidecarClient.cleanEntityStorage(builder.build());
+
+ totalEmptyEntitiesRemoved += response.getEmptyEntitiesRemoved();
+ totalOrphanedLocksReleased += response.getOrphanedLocksReleased();
+
+ continuationToken = response.hasContinuationToken()
+ ? response.getContinuationToken().getValue()
+ : null;
+ } while (request.isContinueUntilComplete() && continuationToken != null);
+
+ return new CleanEntityStorageResult(
+ continuationToken,
+ totalEmptyEntitiesRemoved,
+ totalOrphanedLocksReleased);
+ }
+
+ private EntityMetadata toEntityMetadata(
+ com.microsoft.durabletask.implementation.protobuf.OrchestratorService.EntityMetadata protoEntity) {
+ Instant lastModifiedTime = DataConverter.getInstantFromTimestamp(protoEntity.getLastModifiedTime());
+ String lockedBy = protoEntity.hasLockedBy() ? protoEntity.getLockedBy().getValue() : null;
+ String serializedState = protoEntity.hasSerializedState()
+ ? protoEntity.getSerializedState().getValue()
+ : null;
+
+ return new EntityMetadata(
+ protoEntity.getInstanceId(),
+ lastModifiedTime,
+ protoEntity.getBacklogQueueSize(),
+ lockedBy,
+ serializedState,
+ protoEntity.hasSerializedState(),
+ this.dataConverter);
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/ITaskEntity.java b/client/src/main/java/com/microsoft/durabletask/ITaskEntity.java
new file mode 100644
index 00000000..65d49c54
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/ITaskEntity.java
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+/**
+ * Common interface for durable entity implementations.
+ *
+ * Entities are stateful, single-threaded actors identified by a name+key pair
+ * ({@link EntityInstanceId}). The durable task runtime manages state persistence,
+ * message routing, and concurrency control.
+ *
+ * Implement this interface directly for full control over entity behavior, or extend
+ * {@link TaskEntity} for a higher-level programming model with automatic reflection-based
+ * operation dispatch.
+ *
+ * @see TaskEntity
+ */
+@FunctionalInterface
+public interface ITaskEntity {
+ /**
+ * Executes the entity logic for a single operation.
+ *
+ * @param operation the operation to execute, including the operation name, input, state, and context
+ * @return the result of the operation, which will be serialized and returned to the caller
+ * (for two-way calls). May be {@code null} for void operations.
+ * @throws Exception if the operation fails
+ */
+ Object runAsync(TaskEntityOperation operation) throws Exception;
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/SignalEntityOptions.java b/client/src/main/java/com/microsoft/durabletask/SignalEntityOptions.java
new file mode 100644
index 00000000..1c3a91ed
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/SignalEntityOptions.java
@@ -0,0 +1,41 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.time.Instant;
+
+/**
+ * Options for signaling a durable entity.
+ */
+public final class SignalEntityOptions {
+ private Instant scheduledTime;
+
+ /**
+ * Creates a new {@code SignalEntityOptions} with default settings.
+ */
+ public SignalEntityOptions() {
+ }
+
+ /**
+ * Sets the scheduled time for the signal. If set, the signal will be delivered at the specified time
+ * rather than immediately.
+ *
+ * @param scheduledTime the time at which the signal should be delivered
+ * @return this {@code SignalEntityOptions} object for chaining
+ */
+ public SignalEntityOptions setScheduledTime(@Nullable Instant scheduledTime) {
+ this.scheduledTime = scheduledTime;
+ return this;
+ }
+
+ /**
+ * Gets the scheduled time for the signal, or {@code null} if the signal should be delivered immediately.
+ *
+ * @return the scheduled time, or {@code null}
+ */
+ @Nullable
+ public Instant getScheduledTime() {
+ return this.scheduledTime;
+ }
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/TaskEntity.java b/client/src/main/java/com/microsoft/durabletask/TaskEntity.java
new file mode 100644
index 00000000..b78d5abc
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/TaskEntity.java
@@ -0,0 +1,430 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for durable entities that provides automatic reflection-based operation dispatch.
+ *
+ * Subclasses define operations as public methods. When an operation is received, {@code TaskEntity}
+ * resolves it by:
+ *
+ * - Reflection dispatch on {@code this}: Look for a public method on the entity class
+ * whose name matches the operation name (case-insensitive).
+ * - State dispatch: If no method is found on the entity, look for a matching public
+ * method on the {@code TState} object.
+ * - Implicit delete: If the operation name is "delete" and no explicit method exists,
+ * delete the entity state.
+ * - If none of the above match, throw {@link UnsupportedOperationException}.
+ *
+ *
+ * Methods may accept 0, 1, or 2 parameters. Supported parameter types are the operation input type
+ * and {@link TaskEntityContext}. The method may return the operation result or be {@code void}.
+ *
+ *
Example:
+ *
{@code
+ * public class CounterEntity extends TaskEntity {
+ * public void add(int amount) { this.state += amount; }
+ * public void reset() { this.state = 0; }
+ * public int get() { return this.state; }
+ *
+ * protected Integer initializeState(TaskEntityOperation operation) {
+ * return 0;
+ * }
+ * }
+ * }
+ *
+ * @param the type of the entity's state
+ */
+public abstract class TaskEntity implements ITaskEntity {
+
+ /**
+ * The current state of the entity. Subclasses may read and write this field directly
+ * in their operation methods.
+ */
+ protected TState state;
+
+ /**
+ * The current entity context, providing access to entity metadata such as the entity ID
+ * and the ability to signal other entities.
+ *
+ * This property is automatically set before each operation dispatch and is available for use
+ * in operation methods. This mirrors the .NET SDK's {@code TaskEntity.Context} property.
+ */
+ protected TaskEntityContext context;
+
+ /**
+ * Controls whether operations can be dispatched to methods on the state object.
+ * When {@code true}, if no matching method is found on the entity class itself,
+ * the framework will look for a matching method on the state object.
+ * When {@code false} (the default), only methods on the entity class are considered.
+ *
+ * This matches the .NET SDK default where {@code AllowStateDispatch} is {@code false}.
+ */
+ private boolean allowStateDispatch = false;
+
+ // Per-class cache for resolved methods, keyed by operation name (lowercased).
+ // Uses ClassValue to scope the cache per entity class, matching the .NET model
+ // where MethodInfo caching is per-type. GC can collect entries when classes are unloaded.
+ private static final ClassValue