-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathworkflow_client.py
More file actions
137 lines (111 loc) · 4.71 KB
/
workflow_client.py
File metadata and controls
137 lines (111 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional, List, Dict
from conductor.client.http.models import WorkflowRun, SkipTaskRequest, WorkflowStatus, \
ScrollableSearchResultWorkflowSummary, SignalResponse, WorkflowMessage
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
from conductor.client.http.models.workflow import Workflow
from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate
from conductor.client.http.models.workflow_test_request import WorkflowTestRequest
class WorkflowClient(ABC):
@abstractmethod
def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str:
pass
@abstractmethod
def get_workflow(self, workflow_id: str, include_tasks: Optional[bool] = True) -> Workflow:
pass
@abstractmethod
def get_workflow_status(self, workflow_id: str, include_output: Optional[bool] = None,
include_variables: Optional[bool] = None) -> WorkflowStatus:
pass
@abstractmethod
def delete_workflow(self, workflow_id: str, archive_workflow: Optional[bool] = True):
pass
@abstractmethod
def terminate_workflow(self, workflow_id: str, reason: Optional[str] = None,
trigger_failure_workflow: bool = False):
pass
@abstractmethod
def execute_workflow(
self,
start_workflow_request: StartWorkflowRequest,
request_id: Optional[str] = None,
wait_until_task_ref: Optional[str] = None,
wait_for_seconds: int = 30
) -> WorkflowRun:
pass
@abstractmethod
def execute_workflow_with_return_strategy(
self,
start_workflow_request: StartWorkflowRequest,
request_id: Optional[str] = None,
wait_until_task_ref: Optional[str] = None,
wait_for_seconds: int = 30,
consistency: Optional[str] = None,
return_strategy: Optional[str] = None
) -> SignalResponse:
pass
@abstractmethod
def pause_workflow(self, workflow_id: str):
pass
@abstractmethod
def resume_workflow(self, workflow_id: str):
pass
@abstractmethod
def restart_workflow(self, workflow_id: str, use_latest_def: Optional[bool] = False):
pass
@abstractmethod
def retry_workflow(self, workflow_id: str, resume_subworkflow_tasks: Optional[bool] = False):
pass
@abstractmethod
def rerun_workflow(self, workflow_id: str, rerun_workflow_request: RerunWorkflowRequest):
pass
@abstractmethod
def skip_task_from_workflow(self, workflow_id: str, task_reference_name: str, request: SkipTaskRequest):
pass
@abstractmethod
def test_workflow(self, test_request: WorkflowTestRequest) -> Workflow:
pass
@abstractmethod
def search(self, start: int = 0, size: int = 100, free_text: str = "*",
query: Optional[str] = None) -> ScrollableSearchResultWorkflowSummary:
pass
@abstractmethod
def get_by_correlation_ids_in_batch(
self,
batch_request: CorrelationIdsSearchRequest,
include_completed: bool = False,
include_tasks: bool = False) -> Dict[str, List[Workflow]]:
pass
@abstractmethod
def get_by_correlation_ids(
self,
workflow_name: str,
correlation_ids: List[str],
include_completed: bool = False,
include_tasks: bool = False
) -> Dict[str, List[Workflow]]:
pass
@abstractmethod
def remove_workflow(self, workflow_id: str):
pass
@abstractmethod
def update_variables(self, workflow_id: str, variables: Optional[Dict[str, object]] = None) -> None:
pass
@abstractmethod
def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate,
wait_until_task_ref_names: Optional[List[str]] = None, wait_for_seconds: Optional[int] = None) -> WorkflowRun:
pass
@abstractmethod
def send_message(self, workflow_id: str, message: Dict[str, object]) -> str:
"""Push a message into the message queue of a running workflow (WMQ).
Requires conductor.workflow-message-queue.enabled=true on the server.
Args:
workflow_id: The running workflow instance ID.
message: Arbitrary JSON-serialisable dict to deliver to the workflow.
Returns:
The UUID string assigned to the message by the server.
"""
pass