From 733af7930c821e145d94aa9f70e28e38990536b0 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:39:30 +0100 Subject: [PATCH 1/4] fix: apply DAPR_API_TIMEOUT_SECONDS to workflow gRPC connections The workflow extension's gRPC connections (DaprWorkflowClient, async DaprWorkflowClient, and WorkflowRuntime) were not respecting the DAPR_API_TIMEOUT_SECONDS environment variable, unlike the core SDK's DaprGrpcClient which applies it via a timeout interceptor. Pass DaprClientTimeoutInterceptor (sync) and DaprClientTimeoutInterceptorAsync (async) to the durabletask TaskHubGrpcClient, AsyncTaskHubGrpcClient, and TaskHubGrpcWorker so that workflow gRPC calls get the configured default timeout. Signed-off-by: Fabian Martinez Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- .../dapr/ext/workflow/aio/dapr_workflow_client.py | 2 ++ .../dapr/ext/workflow/dapr_workflow_client.py | 2 ++ ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py | 6 +++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index cd5e632f..bf44f5c8 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -27,6 +27,7 @@ from grpc.aio import AioRpcError from dapr.clients import DaprInternalError +from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -68,6 +69,7 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=[DaprClientTimeoutInterceptorAsync()], ) async def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 36a731c4..1aa85472 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -27,6 +27,7 @@ from grpc import RpcError from dapr.clients import DaprInternalError +from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -71,6 +72,7 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=[DaprClientTimeoutInterceptor()], ) def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index e2bf50d4..53ddfc0e 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -27,6 +27,7 @@ from durabletask import task, worker from dapr.clients import DaprInternalError +from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -71,13 +72,16 @@ def __init__( raise DaprInternalError(f'{error}') from error options = self._logger.get_options() + all_interceptors = [DaprClientTimeoutInterceptor()] + if interceptors: + all_interceptors.extend(interceptors) self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, - interceptors=interceptors, + interceptors=all_interceptors, concurrency_options=worker.ConcurrencyOptions( maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items, maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items, From b0395b30262a67fedec8ae883e0c4553977374e3 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 13 Mar 2026 08:53:12 +0100 Subject: [PATCH 2/4] lint Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- .../dapr/ext/workflow/aio/dapr_workflow_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index bf44f5c8..339b6b77 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -26,8 +26,8 @@ from durabletask.aio import client as aioclient from grpc.aio import AioRpcError -from dapr.clients import DaprInternalError from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync +from dapr.clients import DaprInternalError from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint From 7034b313ee860b1bc4502ac66a2ea0120363a1c3 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 13 Mar 2026 13:36:48 +0100 Subject: [PATCH 3/4] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 53ddfc0e..4c47c566 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -72,9 +72,10 @@ def __init__( raise DaprInternalError(f'{error}') from error options = self._logger.get_options() - all_interceptors = [DaprClientTimeoutInterceptor()] + all_interceptors = [] if interceptors: all_interceptors.extend(interceptors) + all_interceptors.append(DaprClientTimeoutInterceptor()) self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, From 615eab146e7a56fcbb59b9771ee41e88ede96678 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Wed, 18 Mar 2026 17:11:31 +0100 Subject: [PATCH 4/4] add unit tests --- .../tests/test_workflow_client.py | 21 ++++++- .../tests/test_workflow_client_aio.py | 21 ++++++- .../tests/test_workflow_runtime.py | 59 +++++++++++++++++++ 3 files changed, 97 insertions(+), 4 deletions(-) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 7d66d68b..bc7f422f 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -19,11 +19,12 @@ from unittest import mock import durabletask.internal.orchestrator_service_pb2 as pb -from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient -from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from durabletask import client from grpc import RpcError +from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient +from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext + mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' mock_terminate_result = 'terminate001' @@ -112,6 +113,20 @@ def _inner_get_orchestration_state(self, instance_id, state: client.Orchestratio ) +class WorkflowClientTimeoutInterceptorTest(unittest.TestCase): + def test_timeout_interceptor_is_passed_to_client(self): + with mock.patch('durabletask.client.TaskHubGrpcClient') as mock_client_cls: + DaprWorkflowClient() + mock_client_cls.assert_called_once() + call_kwargs = mock_client_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 1) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + + class WorkflowClientTest(unittest.TestCase): def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') @@ -184,3 +199,5 @@ def test_client_functions(self): actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id) assert actual_purge_result == mock_purge_result + actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py index d27047ce..136533d7 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py @@ -19,11 +19,12 @@ from unittest import mock import durabletask.internal.orchestrator_service_pb2 as pb -from dapr.ext.workflow.aio import DaprWorkflowClient -from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from durabletask import client from grpc.aio import AioRpcError +from dapr.ext.workflow.aio import DaprWorkflowClient +from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext + mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' mock_terminate_result = 'terminate001' @@ -113,6 +114,20 @@ def _inner_get_orchestration_state(self, instance_id, state: client.Orchestratio ) +class WorkflowClientAioTimeoutInterceptorTest(unittest.IsolatedAsyncioTestCase): + async def test_timeout_interceptor_is_passed_to_client(self): + with mock.patch('durabletask.aio.client.AsyncTaskHubGrpcClient') as mock_client_cls: + DaprWorkflowClient() + mock_client_cls.assert_called_once() + call_kwargs = mock_client_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 1) + from dapr.aio.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptorAsync + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptorAsync) + + class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase): def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') @@ -188,3 +203,5 @@ async def test_client_functions(self): actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id) assert actual_purge_result == mock_purge_result + actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index b3cadd4a..4f28a23a 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -17,6 +17,8 @@ from typing import List from unittest import mock +import grpc + from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name @@ -39,6 +41,59 @@ def add_named_activity(self, name: str, fn): self._activity_fns[name] = fn +class WorkflowRuntimeTimeoutInterceptorTest(unittest.TestCase): + def setUp(self): + listActivities.clear() + listOrchestrators.clear() + self._registry_patch = mock.patch( + 'durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker() + ) + self._registry_patch.start() + + def tearDown(self): + mock.patch.stopall() + + def test_timeout_interceptor_is_prepended(self): + with mock.patch('durabletask.worker.TaskHubGrpcWorker') as mock_worker_cls: + WorkflowRuntime() + mock_worker_cls.assert_called_once() + call_kwargs = mock_worker_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 1) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + + def test_timeout_interceptor_with_custom_interceptors(self): + custom_interceptor = mock.MagicMock(spec=grpc.UnaryUnaryClientInterceptor) + with mock.patch('durabletask.worker.TaskHubGrpcWorker') as mock_worker_cls: + WorkflowRuntime(interceptors=[custom_interceptor]) + call_kwargs = mock_worker_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 2) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + self.assertIs(interceptors[1], custom_interceptor) + + def test_timeout_interceptor_preserves_custom_interceptor_order(self): + custom1 = mock.MagicMock(spec=grpc.UnaryUnaryClientInterceptor) + custom2 = mock.MagicMock(spec=grpc.UnaryStreamClientInterceptor) + with mock.patch('durabletask.worker.TaskHubGrpcWorker') as mock_worker_cls: + WorkflowRuntime(interceptors=[custom1, custom2]) + call_kwargs = mock_worker_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 3) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + self.assertIs(interceptors[1], custom1) + self.assertIs(interceptors[2], custom2) + + class WorkflowRuntimeTest(unittest.TestCase): def setUp(self): listActivities.clear() @@ -618,3 +673,7 @@ def my_fn(ctx): with self.assertRaises(ValueError) as ctx: alternate_name(name='second')(my_fn) self.assertIn('already has an alternate name', str(ctx.exception)) + + with self.assertRaises(ValueError) as ctx: + alternate_name(name='second')(my_fn) + self.assertIn('already has an alternate name', str(ctx.exception))