Skip to content

Commit 35dfa08

Browse files
committed
Workflow versioning
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent de3eef6 commit 35dfa08

7 files changed

Lines changed: 392 additions & 2 deletions

File tree

dev-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Flask>=1.1
1414
# needed for auto fix
1515
ruff===0.14.1
1616
# needed for dapr-ext-workflow
17-
durabletask-dapr >= 0.2.0a15
17+
durabletask-dapr >= 0.2.0a16
1818
# needed for .env file loading in examples
1919
python-dotenv>=1.0.0
2020
# needed for enhanced schema generation from function features

examples/workflow/README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,50 @@ app1 - received workflow error from app2
461461
```
462462
among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too.
463463

464+
465+
### Versioning
466+
467+
This example demonstrates how to version a workflow.
468+
The test consists of two parts:
469+
1. Uses most of the common features of the workflow versioning. It also leaves some workflows stalled to demonstrate the stalled workflow feature.
470+
2. Fixes the stalled workflows to get them to completion.
471+
472+
It had to be done in two parts because the runtime needs to be restarted in order to rerun stalled workflows.
473+
474+
The Dapr CLI can be started using the following command:
475+
476+
<!--STEP
477+
name: Run the versioning example
478+
match_order: none
479+
expected_stdout_lines:
480+
- "== APP == test1: triggering workflow"
481+
- "== APP == test1: Received workflow call for version1"
482+
- "== APP == test1: Finished workflow for version1"
483+
- "== APP == test2: triggering workflow"
484+
- "== APP == test2: Received workflow call for version1"
485+
- "== APP == test2: Finished workflow for version1"
486+
- "== APP == test3: triggering workflow"
487+
- "== APP == test3: Received workflow call for version2"
488+
- "== APP == test3: Finished workflow for version2"
489+
- "== APP == test4: start"
490+
- "== APP == test4: patch1 is patched"
491+
- "== APP == test5: start"
492+
- "== APP == test5: patch1 is not patched"
493+
- "== APP == test5: patch2 is patched"
494+
- "== APP == test6: start"
495+
- "== APP == test6: patch1 is patched"
496+
- "== APP == test6: patch2 is patched"
497+
- "== APP == test7: Received workflow call for version1"
498+
- "== APP == test7: Workflow is stalled"
499+
- "== APP == test8: Workflow is stalled"
500+
- "== APP == test100: part2"
501+
- "== APP == test100: Finished stalled version1 workflow"
502+
- "== APP == test100: Finished stalled patching workflow"
503+
timeout_seconds: 60
504+
-->
505+
506+
```sh
507+
dapr run --app-id wf-versioning-example -- python3 versioning.py part1
508+
dapr run --app-id wf-versioning-example --log-level debug -- python3 versioning.py part2
509+
```
510+
<!--END_STEP-->

examples/workflow/versioning.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import sys
14+
import time
15+
16+
import dapr.ext.workflow as wf
17+
from durabletask.task import Activity
18+
19+
20+
current_test = 0
21+
22+
def print_test(message):
23+
print(f'test{current_test}: {message}', flush=True)
24+
25+
print_activity = None
26+
27+
wfr = None
28+
def new_wfr():
29+
global wfr
30+
global print_activity
31+
32+
if wfr is not None:
33+
wfr.shutdown()
34+
wfr = wf.WorkflowRuntime()
35+
print_activity = lambda ctx, input: print_test(input)
36+
wfr.register_activity(print_activity, name='print_activity')
37+
38+
new_wfr()
39+
40+
def test_full_versioning(client: wf.DaprWorkflowClient):
41+
global current_test
42+
43+
# Start with only one version defined. Runnig the workflow should run this version as it normally would.
44+
current_test = 1
45+
@wfr.versioned_workflow(name="workflow", is_latest=True)
46+
def version1_workflow(ctx: wf.DaprWorkflowContext):
47+
yield ctx.call_activity(print_activity, input='Received workflow call for version1')
48+
yield ctx.wait_for_external_event(name='event')
49+
yield ctx.call_activity(print_activity, input='Finished workflow for version1')
50+
return 1
51+
52+
print_test('triggering workflow')
53+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
54+
client.raise_workflow_event(instance_id, event_name='event')
55+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
56+
57+
# Now we start a workflow, but introduce a latest version half way. It should resume the execution in the old version.
58+
current_test = 2
59+
print_test('triggering workflow')
60+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
61+
time.sleep(2) # wait for the workflow to start and wait for the event
62+
63+
@wfr.versioned_workflow(name="workflow", is_latest=True)
64+
def version2_workflow(ctx: wf.DaprWorkflowContext):
65+
yield ctx.call_activity(print_activity, input='Received workflow call for version2')
66+
yield ctx.wait_for_external_event(name='event')
67+
yield ctx.call_activity(print_activity, input='Finished workflow for version2')
68+
return 1
69+
70+
client.raise_workflow_event(instance_id, event_name='event')
71+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
72+
73+
74+
# Now we have the two versions defined, running the workflow now should run v2 as it's the latest version.
75+
current_test = 3
76+
print_test('triggering workflow')
77+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
78+
client.raise_workflow_event(instance_id, event_name='event')
79+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
80+
81+
def test_patching(client: wf.DaprWorkflowClient):
82+
global current_test
83+
84+
@wfr.workflow
85+
def patching_workflow(ctx: wf.DaprWorkflowContext):
86+
# This function will be changed throughout the test, to simulate different scenarios
87+
return workflow_code(ctx)
88+
89+
90+
# Runs the patched branch by default
91+
current_test = 4
92+
def workflow_code(ctx: wf.DaprWorkflowContext):
93+
yield ctx.call_activity(print_activity, input='start')
94+
if ctx.is_patched('patch1'):
95+
yield ctx.call_activity(print_activity, input='patch1 is patched')
96+
else:
97+
yield ctx.call_activity(print_activity, input='patch1 is not patched')
98+
return 1
99+
100+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
101+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
102+
103+
# When the execution passed the place where a patch is introduced, it should be not patched.
104+
def workflow_code(ctx: wf.DaprWorkflowContext):
105+
yield ctx.call_activity(print_activity, input='start')
106+
yield ctx.wait_for_external_event(name='event')
107+
if ctx.is_patched('patch2'):
108+
yield ctx.call_activity(print_activity, input='patch2 is patched')
109+
else:
110+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
111+
return 1
112+
113+
current_test = 5
114+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
115+
time.sleep(2)
116+
def workflow_code(ctx: wf.DaprWorkflowContext):
117+
yield ctx.call_activity(print_activity, input='start')
118+
if ctx.is_patched('patch1'):
119+
yield ctx.call_activity(print_activity, input='patch1 is patched')
120+
else:
121+
yield ctx.call_activity(print_activity, input='patch1 is not patched')
122+
yield ctx.wait_for_external_event(name='event')
123+
if ctx.is_patched('patch2'):
124+
yield ctx.call_activity(print_activity, input='patch2 is patched')
125+
else:
126+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
127+
return 1
128+
129+
client.raise_workflow_event(instance_id, event_name='event')
130+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
131+
132+
# It remembers previous patches.
133+
def workflow_code(ctx: wf.DaprWorkflowContext):
134+
yield ctx.call_activity(print_activity, input='start')
135+
if ctx.is_patched('patch1'):
136+
pass # keep it silenced for now, we'll add logs later and this ones would confuse the test
137+
else:
138+
pass
139+
yield ctx.wait_for_external_event(name='event')
140+
if ctx.is_patched('patch2'):
141+
yield ctx.call_activity(print_activity, input='patch2 is patched')
142+
else:
143+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
144+
return 1
145+
146+
current_test = 6
147+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
148+
time.sleep(2)
149+
def workflow_code(ctx: wf.DaprWorkflowContext):
150+
yield ctx.call_activity(print_activity, input='start')
151+
if ctx.is_patched('patch1'):
152+
yield ctx.call_activity(print_activity, input='patch1 is patched')
153+
else:
154+
yield ctx.call_activity(print_activity, input='patch1 is not patched')
155+
yield ctx.wait_for_external_event(name='event')
156+
if ctx.is_patched('patch2'):
157+
yield ctx.call_activity(print_activity, input='patch2 is patched')
158+
else:
159+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
160+
return 1
161+
162+
client.raise_workflow_event(instance_id, event_name='event')
163+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
164+
165+
166+
def test_full_versioning_stall(client: wf.DaprWorkflowClient):
167+
global current_test
168+
169+
new_wfr()
170+
@wfr.versioned_workflow(name="stall_workflow", is_latest=True)
171+
def version1_workflow(ctx: wf.DaprWorkflowContext):
172+
yield ctx.call_activity(print_activity, input='Received workflow call for version1')
173+
yield ctx.wait_for_external_event(name='event')
174+
yield ctx.call_activity(print_activity, input='Finished workflow for version1')
175+
return 1
176+
177+
wfr.start()
178+
current_test = 7
179+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
180+
time.sleep(3)
181+
new_wfr()
182+
183+
@wfr.versioned_workflow(name="stall_workflow", is_latest=True)
184+
def version2_workflow(ctx: wf.DaprWorkflowContext):
185+
yield ctx.call_activity(print_activity, input='Received workflow call for version2')
186+
yield ctx.wait_for_external_event(name='event')
187+
yield ctx.call_activity(print_activity, input='Finished workflow for version2')
188+
return 1
189+
wfr.start()
190+
client.raise_workflow_event(instance_id, event_name='event')
191+
time.sleep(2)
192+
md = client.get_workflow_state(instance_id)
193+
if md.runtime_status == wf.WorkflowStatus.STALLED:
194+
print_test('Workflow is stalled')
195+
else:
196+
print_test('Workflow is not stalled')
197+
198+
def test_patching_stall(client: wf.DaprWorkflowClient):
199+
global current_test
200+
201+
current_test = 8
202+
@wfr.workflow
203+
def patching_workflow(ctx: wf.DaprWorkflowContext):
204+
# This function will be changed throughout the test, to simulate different scenarios
205+
return workflow_code(ctx)
206+
207+
def workflow_code(ctx: wf.DaprWorkflowContext):
208+
if ctx.is_patched('patch1'):
209+
pass
210+
else:
211+
pass
212+
yield ctx.wait_for_external_event(name='event')
213+
return 1
214+
215+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
216+
time.sleep(2)
217+
218+
def workflow_code(ctx: wf.DaprWorkflowContext):
219+
# Removed patch1 check
220+
yield ctx.wait_for_external_event(name='event')
221+
return 1
222+
223+
client.raise_workflow_event(instance_id, event_name='event')
224+
time.sleep(2)
225+
md = client.get_workflow_state(instance_id)
226+
if md.runtime_status == wf.WorkflowStatus.STALLED:
227+
print_test('Workflow is stalled')
228+
else:
229+
print_test('Workflow is not stalled')
230+
231+
232+
def main():
233+
args = sys.argv[1:]
234+
if len(args) == 0:
235+
print('Usage: python versioning.py <part1|part2>')
236+
return
237+
if args[0] == 'part1':
238+
wfr.start()
239+
time.sleep(2) # wait for workflow runtime to start
240+
client = wf.DaprWorkflowClient()
241+
242+
test_full_versioning(client)
243+
test_patching(client)
244+
245+
test_full_versioning_stall(client)
246+
test_patching_stall(client)
247+
wfr.shutdown()
248+
elif args[0] == 'part2':
249+
global current_test
250+
current_test = 100
251+
print_test('part2')
252+
@wfr.versioned_workflow(name="stall_workflow", is_latest=False)
253+
def version1_workflow(ctx: wf.DaprWorkflowContext):
254+
yield ctx.call_activity(print_activity, input='Received workflow call for version1')
255+
yield ctx.wait_for_external_event(name='event')
256+
yield ctx.call_activity(print_activity, input='Finished stalled version1 workflow')
257+
return 1
258+
@wfr.versioned_workflow(name="stall_workflow", is_latest=True)
259+
def version2_workflow(ctx: wf.DaprWorkflowContext):
260+
yield ctx.call_activity(print_activity, input='Received workflow call for version2')
261+
yield ctx.wait_for_external_event(name='event')
262+
yield ctx.call_activity(print_activity, input='Finished stalled version2 workflow')
263+
return 1
264+
265+
@wfr.workflow
266+
def patching_workflow(ctx: wf.DaprWorkflowContext):
267+
if ctx.is_patched('patch1'):
268+
pass
269+
else:
270+
pass
271+
yield ctx.wait_for_external_event(name='event')
272+
yield ctx.call_activity(print_activity, input='Finished stalled patching workflow')
273+
return 1
274+
275+
wfr.start()
276+
time.sleep(10)
277+
wfr.shutdown()
278+
279+
if __name__ == '__main__':
280+
main()

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
154154
self.__obj.continue_as_new(new_input, save_events=save_events)
155155

156156

157+
def is_patched(self, patch_name: str) -> bool:
158+
self._logger.debug(f'{self.instance_id}: Checking if {patch_name} is patched')
159+
return self.__obj.is_patched(patch_name)
160+
161+
157162
def when_all(tasks: List[task.Task[T]]) -> task.WhenAllTask[T]:
158163
"""Returns a task that completes when all of the provided tasks complete or when one of the
159164
tasks fail."""

0 commit comments

Comments
 (0)