Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit a503330

Browse files
committed
Workflow versioning
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent 226b036 commit a503330

8 files changed

Lines changed: 569 additions & 256 deletions

File tree

durabletask/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class OrchestrationStatus(Enum):
3333
PENDING = pb.ORCHESTRATION_STATUS_PENDING
3434
SUSPENDED = pb.ORCHESTRATION_STATUS_SUSPENDED
3535
CANCELED = pb.ORCHESTRATION_STATUS_CANCELED
36+
STALLED = pb.ORCHESTRATION_STATUS_STALLED
3637

3738
def __str__(self):
3839
return helpers.get_orchestration_status_str(self.value)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4b86756497d875b97f9a91051781b5711c1e4fa6
1+
889781bbe90e6ec84ebe169978c4f2fd0df74ff0

durabletask/internal/helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,15 @@ def new_complete_orchestration_action(
188188
)
189189

190190

191+
def new_orchestrator_version_not_available_action(
192+
id: int,
193+
) -> pb.OrchestratorAction:
194+
return pb.OrchestratorAction(
195+
id=id,
196+
orchestratorVersionNotAvailable=pb.OrchestratorVersionNotAvailableAction(),
197+
)
198+
199+
191200
def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction:
192201
timestamp = timestamp_pb2.Timestamp()
193202
timestamp.FromDatetime(fire_at)

durabletask/internal/orchestrator_service_pb2.py

Lines changed: 245 additions & 227 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

durabletask/internal/orchestrator_service_pb2.pyi

Lines changed: 99 additions & 18 deletions
Large diffs are not rendered by default.

durabletask/internal/orchestrator_service_pb2_grpc.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,16 @@ def __init__(self, channel):
170170
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RerunWorkflowFromEventRequest.SerializeToString,
171171
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RerunWorkflowFromEventResponse.FromString,
172172
_registered_method=True)
173+
self.ListInstanceIDs = channel.unary_unary(
174+
'/TaskHubSidecarService/ListInstanceIDs',
175+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIDsRequest.SerializeToString,
176+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIDsResponse.FromString,
177+
_registered_method=True)
178+
self.GetInstanceHistory = channel.unary_unary(
179+
'/TaskHubSidecarService/GetInstanceHistory',
180+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceHistoryRequest.SerializeToString,
181+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceHistoryResponse.FromString,
182+
_registered_method=True)
173183

174184

175185
class TaskHubSidecarServiceServicer(object):
@@ -360,6 +370,18 @@ def RerunWorkflowFromEvent(self, request, context):
360370
context.set_details('Method not implemented!')
361371
raise NotImplementedError('Method not implemented!')
362372

373+
def ListInstanceIDs(self, request, context):
374+
"""Missing associated documentation comment in .proto file."""
375+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
376+
context.set_details('Method not implemented!')
377+
raise NotImplementedError('Method not implemented!')
378+
379+
def GetInstanceHistory(self, request, context):
380+
"""Missing associated documentation comment in .proto file."""
381+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
382+
context.set_details('Method not implemented!')
383+
raise NotImplementedError('Method not implemented!')
384+
363385

364386
def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
365387
rpc_method_handlers = {
@@ -498,6 +520,16 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
498520
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RerunWorkflowFromEventRequest.FromString,
499521
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RerunWorkflowFromEventResponse.SerializeToString,
500522
),
523+
'ListInstanceIDs': grpc.unary_unary_rpc_method_handler(
524+
servicer.ListInstanceIDs,
525+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIDsRequest.FromString,
526+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIDsResponse.SerializeToString,
527+
),
528+
'GetInstanceHistory': grpc.unary_unary_rpc_method_handler(
529+
servicer.GetInstanceHistory,
530+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceHistoryRequest.FromString,
531+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceHistoryResponse.SerializeToString,
532+
),
501533
}
502534
generic_handler = grpc.method_handlers_generic_handler(
503535
'TaskHubSidecarService', rpc_method_handlers)
@@ -1237,3 +1269,57 @@ def RerunWorkflowFromEvent(request,
12371269
timeout,
12381270
metadata,
12391271
_registered_method=True)
1272+
1273+
@staticmethod
1274+
def ListInstanceIDs(request,
1275+
target,
1276+
options=(),
1277+
channel_credentials=None,
1278+
call_credentials=None,
1279+
insecure=False,
1280+
compression=None,
1281+
wait_for_ready=None,
1282+
timeout=None,
1283+
metadata=None):
1284+
return grpc.experimental.unary_unary(
1285+
request,
1286+
target,
1287+
'/TaskHubSidecarService/ListInstanceIDs',
1288+
durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIDsRequest.SerializeToString,
1289+
durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIDsResponse.FromString,
1290+
options,
1291+
channel_credentials,
1292+
insecure,
1293+
call_credentials,
1294+
compression,
1295+
wait_for_ready,
1296+
timeout,
1297+
metadata,
1298+
_registered_method=True)
1299+
1300+
@staticmethod
1301+
def GetInstanceHistory(request,
1302+
target,
1303+
options=(),
1304+
channel_credentials=None,
1305+
call_credentials=None,
1306+
insecure=False,
1307+
compression=None,
1308+
wait_for_ready=None,
1309+
timeout=None,
1310+
metadata=None):
1311+
return grpc.experimental.unary_unary(
1312+
request,
1313+
target,
1314+
'/TaskHubSidecarService/GetInstanceHistory',
1315+
durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceHistoryRequest.SerializeToString,
1316+
durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceHistoryResponse.FromString,
1317+
options,
1318+
channel_credentials,
1319+
insecure,
1320+
call_credentials,
1321+
compression,
1322+
wait_for_ready,
1323+
timeout,
1324+
metadata,
1325+
_registered_method=True)

durabletask/task.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
189189
"""
190190
pass
191191

192+
@abstractmethod
193+
def is_patched(self, patch_name: str) -> bool:
194+
"""Check if the given patch name can be applied to the orchestration.
195+
196+
Parameters
197+
----------
198+
patch_name : str
199+
The name of the patch to check.
200+
201+
Returns
202+
-------
203+
bool
204+
True if the given patch name can be applied to the orchestration, False otherwise.
205+
"""
206+
pass
207+
192208

193209
class FailureDetails:
194210
def __init__(self, message: str, error_type: str, stack_trace: Optional[str]):

durabletask/worker.py

Lines changed: 112 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
TInput = TypeVar("TInput")
2626
TOutput = TypeVar("TOutput")
2727

28+
class VersionNotRegisteredException(Exception):
29+
pass
2830

2931
class ConcurrencyOptions:
3032
"""Configuration options for controlling concurrency of different work item types and the thread pool size.
@@ -74,30 +76,58 @@ def __init__(
7476

7577
class _Registry:
7678
orchestrators: dict[str, task.Orchestrator]
79+
versioned_orchestrators: dict[str, dict[str, task.Orchestrator]]
80+
latest_versioned_orchestrators_version_name: dict[str, str]
7781
activities: dict[str, task.Activity]
7882

7983
def __init__(self):
8084
self.orchestrators = {}
85+
self.versioned_orchestrators = {}
86+
self.latest_versioned_orchestrators_version_name = {}
8187
self.activities = {}
8288

83-
def add_orchestrator(self, fn: task.Orchestrator) -> str:
89+
def add_orchestrator(self, fn: task.Orchestrator, version_name: Optional[str] = None, is_latest: bool = False) -> str:
8490
if fn is None:
8591
raise ValueError("An orchestrator function argument is required.")
8692

8793
name = task.get_name(fn)
88-
self.add_named_orchestrator(name, fn)
94+
self.add_named_orchestrator(name, fn, version_name, is_latest)
8995
return name
9096

91-
def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None:
97+
def add_named_orchestrator(self, name: str, fn: task.Orchestrator, version_name: Optional[str] = None, is_latest: bool = False) -> None:
9298
if not name:
9399
raise ValueError("A non-empty orchestrator name is required.")
100+
101+
if version_name is None:
102+
if name in self.orchestrators:
103+
raise ValueError(f"A '{name}' orchestrator already exists.")
104+
self.orchestrators[name] = fn
105+
else:
106+
if name not in self.versioned_orchestrators:
107+
self.versioned_orchestrators[name] = {}
108+
if version_name in self.versioned_orchestrators[name]:
109+
raise ValueError(f"The version '{version_name}' of '{name}' orchestrator already exists.")
110+
self.versioned_orchestrators[name][version_name] = fn
111+
if is_latest:
112+
self.latest_versioned_orchestrators_version_name[name] = version_name
113+
114+
def get_orchestrator(self, name: str, version_name: Optional[str] = None) -> Optional[tuple[task.Orchestrator, str]]:
94115
if name in self.orchestrators:
95-
raise ValueError(f"A '{name}' orchestrator already exists.")
116+
return self.orchestrators.get(name), None
96117

97-
self.orchestrators[name] = fn
118+
if name in self.versioned_orchestrators:
119+
if version_name:
120+
version_to_use = version_name
121+
elif name in self.latest_versioned_orchestrators_version_name:
122+
version_to_use = self.latest_versioned_orchestrators_version_name[name]
123+
else:
124+
return None, None
125+
126+
if version_to_use not in self.versioned_orchestrators[name]:
127+
raise VersionNotRegisteredException
128+
return self.versioned_orchestrators[name].get(version_to_use), version_to_use
98129

99-
def get_orchestrator(self, name: str) -> Optional[task.Orchestrator]:
100-
return self.orchestrators.get(name)
130+
return None, None
101131

102132
def add_activity(self, fn: task.Activity) -> str:
103133
if fn is None:
@@ -540,11 +570,22 @@ def _execute_orchestrator(
540570
try:
541571
executor = _OrchestrationExecutor(self._registry, self._logger)
542572
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
573+
574+
version = None
575+
if result.version_name:
576+
version = version or pb.OrchestrationVersion()
577+
version.name = result.version_name
578+
if result.patches:
579+
version = version or pb.OrchestrationVersion()
580+
version.patches.extend(result.patches)
581+
582+
543583
res = pb.OrchestratorResponse(
544584
instanceId=req.instanceId,
545585
actions=result.actions,
546586
customStatus=ph.get_string_value(result.encoded_custom_status),
547587
completionToken=completionToken,
588+
version=version,
548589
)
549590
except Exception as ex:
550591
self._logger.exception(
@@ -629,6 +670,11 @@ def __init__(self, instance_id: str):
629670
self._new_input: Optional[Any] = None
630671
self._save_events = False
631672
self._encoded_custom_status: Optional[str] = None
673+
self._orchestrator_started_version: Optional[pb.OrchestrationVersion] = None
674+
self._version_name: Optional[str] = None
675+
self._history_patches: dict[str, bool] = {}
676+
self._applied_patches: dict[str, bool] = {}
677+
self._encountered_patches: list[str] = []
632678

633679
def run(self, generator: Generator[task.Task, Any, Any]):
634680
self._generator = generator
@@ -705,6 +751,14 @@ def set_failed(self, ex: Exception):
705751
)
706752
self._pending_actions[action.id] = action
707753

754+
755+
def set_version_not_registered(self):
756+
self._pending_actions.clear()
757+
self._completion_status = pb.ORCHESTRATION_STATUS_STALLED
758+
action = ph.new_orchestrator_version_not_available_action(self.next_sequence_number())
759+
self._pending_actions[action.id] = action
760+
761+
708762
def set_continued_as_new(self, new_input: Any, save_events: bool):
709763
if self._is_complete:
710764
return
@@ -916,13 +970,38 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
916970
self.set_continued_as_new(new_input, save_events)
917971

918972

973+
def is_patched(self, patch_name: str) -> bool:
974+
is_patched = self._is_patched(patch_name)
975+
if is_patched:
976+
self._encountered_patches.append(patch_name)
977+
return is_patched
978+
979+
def _is_patched(self, patch_name: str) -> bool:
980+
if patch_name in self._applied_patches:
981+
return self._applied_patches[patch_name]
982+
if patch_name in self._history_patches:
983+
self._applied_patches[patch_name] = True
984+
return True
985+
986+
if self._is_replaying:
987+
self._applied_patches[patch_name] = False
988+
return False
989+
990+
self._applied_patches[patch_name] = True
991+
return True
992+
993+
919994
class ExecutionResults:
920995
actions: list[pb.OrchestratorAction]
921996
encoded_custom_status: Optional[str]
997+
version_name: Optional[str]
998+
patches: Optional[list[str]]
922999

923-
def __init__(self, actions: list[pb.OrchestratorAction], encoded_custom_status: Optional[str]):
1000+
def __init__(self, actions: list[pb.OrchestratorAction], encoded_custom_status: Optional[str], version_name: Optional[str] = None, patches: Optional[list[str]] = None):
9241001
self.actions = actions
9251002
self.encoded_custom_status = encoded_custom_status
1003+
self.version_name = version_name
1004+
self.patches = patches
9261005

9271006

9281007
class _OrchestrationExecutor:
@@ -965,6 +1044,8 @@ def execute(
9651044
for new_event in new_events:
9661045
self.process_event(ctx, new_event)
9671046

1047+
except VersionNotRegisteredException:
1048+
ctx.set_version_not_registered()
9681049
except Exception as ex:
9691050
# Unhandled exceptions fail the orchestration
9701051
ctx.set_failed(ex)
@@ -989,7 +1070,12 @@ def execute(
9891070
self._logger.debug(
9901071
f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}"
9911072
)
992-
return ExecutionResults(actions=actions, encoded_custom_status=ctx._encoded_custom_status)
1073+
return ExecutionResults(
1074+
actions=actions,
1075+
encoded_custom_status=ctx._encoded_custom_status,
1076+
version_name=getattr(ctx, '_version_name', None),
1077+
patches=ctx._encountered_patches
1078+
)
9931079

9941080
def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
9951081
if self._is_suspended and _is_suspendable(event):
@@ -1001,19 +1087,32 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
10011087
try:
10021088
if event.HasField("orchestratorStarted"):
10031089
ctx.current_utc_datetime = event.timestamp.ToDatetime()
1090+
ctx._orchestrator_started_version = event.orchestratorStarted.version
10041091
elif event.HasField("executionStarted"):
10051092
if event.router.targetAppID:
10061093
ctx._app_id = event.router.targetAppID
10071094
else:
10081095
ctx._app_id = event.router.sourceAppID
10091096

1097+
if ctx._orchestrator_started_version and ctx._orchestrator_started_version.patches:
1098+
ctx._history_patches = {patch: True for patch in ctx._orchestrator_started_version.patches}
1099+
1100+
version_name = None
1101+
if ctx._orchestrator_started_version and ctx._orchestrator_started_version.name:
1102+
version_name = ctx._orchestrator_started_version.name
1103+
1104+
10101105
# TODO: Check if we already started the orchestration
1011-
fn = self._registry.get_orchestrator(event.executionStarted.name)
1106+
fn, version_used = self._registry.get_orchestrator(event.executionStarted.name, version_name=version_name)
1107+
10121108
if fn is None:
10131109
raise OrchestratorNotRegisteredError(
10141110
f"A '{event.executionStarted.name}' orchestrator was not registered."
10151111
)
10161112

1113+
if version_used is not None:
1114+
ctx._version_name = version_used
1115+
10171116
# deserialize the input, if any
10181117
input = None
10191118
if (
@@ -1280,6 +1379,9 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
12801379
pb.ORCHESTRATION_STATUS_TERMINATED,
12811380
is_result_encoded=True,
12821381
)
1382+
elif event.HasField("executionStalled"):
1383+
# Nothing to do
1384+
pass
12831385
else:
12841386
eventType = event.WhichOneof("eventType")
12851387
raise task.OrchestrationStateError(

0 commit comments

Comments
 (0)