-
Notifications
You must be signed in to change notification settings - Fork 139
Expand file tree
/
Copy pathtest_workflow_client.py
More file actions
203 lines (164 loc) · 7.76 KB
/
test_workflow_client.py
File metadata and controls
203 lines (164 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# -*- coding: utf-8 -*-
"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
from datetime import datetime
from typing import Any, Union
from unittest import mock
import durabletask.internal.orchestrator_service_pb2 as pb
from durabletask import client
from grpc import RpcError
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
mock_schedule_result = 'workflow001'
mock_raise_event_result = 'event001'
mock_terminate_result = 'terminate001'
mock_suspend_result = 'suspend001'
mock_resume_result = 'resume001'
mock_purge_result = 'purge001'
mock_instance_id = 'instance001'
wf_status = 'not-found'
class SimulatedRpcError(RpcError):
def __init__(self, code, details):
self._code = code
self._details = details
def code(self):
return self._code
def details(self):
return self._details
class FakeTaskHubGrpcClient:
def __init__(self):
self.last_scheduled_workflow_name = None
def schedule_new_orchestration(
self,
workflow,
input,
instance_id,
start_at,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None,
):
self.last_scheduled_workflow_name = workflow
return mock_schedule_result
def get_orchestration_state(self, instance_id, fetch_payloads):
if wf_status == 'not-found':
raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists')
elif wf_status == 'found':
return self._inner_get_orchestration_state(
instance_id, client.OrchestrationStatus.PENDING
)
else:
raise SimulatedRpcError(code='UNKNOWN', details='unknown error')
def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout):
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING)
def wait_for_orchestration_completion(self, instance_id, fetch_payloads, timeout):
return self._inner_get_orchestration_state(
instance_id, client.OrchestrationStatus.COMPLETED
)
def raise_orchestration_event(
self, instance_id: str, event_name: str, *, data: Union[Any, None] = None
):
return mock_raise_event_result
def terminate_orchestration(
self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True
):
return mock_terminate_result
def suspend_orchestration(self, instance_id: str):
return mock_suspend_result
def resume_orchestration(self, instance_id: str):
return mock_resume_result
def purge_orchestration(self, instance_id: str, recursive: bool = True):
return mock_purge_result
def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
return client.OrchestrationState(
instance_id=instance_id,
name='',
runtime_status=state,
created_at=datetime.now(),
last_updated_at=datetime.now(),
serialized_input=None,
serialized_output=None,
serialized_custom_status=None,
failure_details=None,
)
class WorkflowClientTimeoutInterceptorTest(unittest.TestCase):
def test_timeout_interceptor_is_passed_to_client(self):
with mock.patch('durabletask.client.TaskHubGrpcClient') as mock_client_cls:
DaprWorkflowClient()
mock_client_cls.assert_called_once()
call_kwargs = mock_client_cls.call_args[1]
interceptors = call_kwargs['interceptors']
self.assertEqual(len(interceptors), 1)
from dapr.clients.grpc.interceptors import \
DaprClientTimeoutInterceptor
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor)
class WorkflowClientTest(unittest.TestCase):
def mock_client_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
def test_schedule_workflow_by_name_string(self):
fake_client = FakeTaskHubGrpcClient()
with mock.patch('durabletask.client.TaskHubGrpcClient', return_value=fake_client):
wfClient = DaprWorkflowClient()
result = wfClient.schedule_new_workflow(workflow='my_registered_workflow', input='data')
assert result == mock_schedule_result
assert fake_client.last_scheduled_workflow_name == 'my_registered_workflow'
def test_client_functions(self):
with mock.patch(
'durabletask.client.TaskHubGrpcClient', return_value=FakeTaskHubGrpcClient()
):
wfClient = DaprWorkflowClient()
actual_schedule_result = wfClient.schedule_new_workflow(
workflow=self.mock_client_wf, input='Hi Chef!'
)
assert actual_schedule_result == mock_schedule_result
global wf_status
wf_status = 'not-found'
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result is None
wf_status = 'error'
with self.assertRaises(RpcError):
wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True)
assert actual_get_result is None
wf_status = 'found'
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result.runtime_status.name == 'PENDING'
assert actual_get_result.instance_id == mock_instance_id
actual_wait_start_result = wfClient.wait_for_workflow_start(
instance_id=mock_instance_id, timeout_in_seconds=30
)
assert actual_wait_start_result.runtime_status.name == 'RUNNING'
assert actual_wait_start_result.instance_id == mock_instance_id
actual_wait_completion_result = wfClient.wait_for_workflow_completion(
instance_id=mock_instance_id, timeout_in_seconds=30
)
assert actual_wait_completion_result.runtime_status.name == 'COMPLETED'
assert actual_wait_completion_result.instance_id == mock_instance_id
actual_raise_event_result = wfClient.raise_workflow_event(
instance_id=mock_instance_id, event_name='test_event', data='test_data'
)
assert actual_raise_event_result == mock_raise_event_result
actual_terminate_result = wfClient.terminate_workflow(
instance_id=mock_instance_id, output='test_output'
)
assert actual_terminate_result == mock_terminate_result
actual_suspend_result = wfClient.pause_workflow(instance_id=mock_instance_id)
assert actual_suspend_result == mock_suspend_result
actual_resume_result = wfClient.resume_workflow(instance_id=mock_instance_id)
assert actual_resume_result == mock_resume_result
actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
assert actual_purge_result == mock_purge_result
actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
assert actual_purge_result == mock_purge_result