-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.py
More file actions
394 lines (345 loc) · 15 KB
/
task.py
File metadata and controls
394 lines (345 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
"""Random values workflow plugin module"""
import json
from collections.abc import Sequence
from dataclasses import dataclass
from http import HTTPStatus
from time import sleep
from cmem.cmempy.api import config, get_json
from cmem.cmempy.workflow.workflow import execute_workflow_io, get_workflows_io
from cmem_plugin_base.dataintegration.context import ExecutionContext, ExecutionReport
from cmem_plugin_base.dataintegration.description import Icon, Plugin, PluginParameter
from cmem_plugin_base.dataintegration.entity import (
Entities,
Entity,
EntityPath,
EntitySchema,
)
from cmem_plugin_base.dataintegration.plugins import PluginLogger, WorkflowPlugin
from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FlexibleSchemaPort
from cmem_plugin_base.dataintegration.typed_entities.file import FileEntitySchema
from cmem_plugin_base.dataintegration.types import (
BoolParameterType,
IntParameterType,
StringParameterType,
)
from cmem_plugin_base.dataintegration.utils import setup_cmempy_user_access
from requests import HTTPError
from cmem_plugin_loopwf import exceptions
from cmem_plugin_loopwf.workflow_type import SuitableWorkflowParameterType
DOCUMENTATION = """Run another workflow once per incoming entity.
## Overview
- **Per-entity execution**: For every entity on the input port, this task starts one selected
sub-workflow.
- **Execution modes**: Runs sequentially by default or in parallel with a configurable
concurrency.
- **Input handover**: Each entity is converted to a JSON object and provided to the sub-workflow
via its single replaceable (variable) input dataset.
- **Optional pass-through**: Optionally forwards the original input entities to the output port;
it never returns results produced by the sub-workflow.
- **File support (beta)**: When processing file entities and a `input_mime_type` is set, the file
content is sent to the sub-workflow instead of the file metadata.
## How It Works
1. Read entities from the single input port (flexible schema).
2. Convert each entity to a flat JSON object using the entity schema (one value per path required).
3. Start the chosen sub-workflow once per entity, supplying the JSON as the replaceable
input dataset.
4. Run up to `parallel_execution` workflow instances at the same time.
5. Stop with an error if any sub-workflow fails; see details in Activities.
Example entity mapping (illustrative):
Input schema paths: `label`, `id` → JSON payload: `{ "label": "Example", "id": "123" }`
## Requirements
- The selected workflow must be in the same project as this task.
- The selected workflow must have exactly one replaceable input dataset.
- The input entities must be flat: each schema path may have at most one value per entity.
## Limitations
- Nested or multi-valued entities are not supported; multiple values per path raise an error.
- The replaceable dataset of the sub-workflow must be a JSON dataset.
- No circular dependency detection is performed.
- File processing is beta; correct `input_mime_type` and a file-accepting dataset in the
sub-workflow are required.
## Troubleshooting
- "Need a connected input task": Connect one upstream task to provide entities.
- "Can process a single input only": Only one input port is supported.
- "Multiple values for entity path": Ensure each path has at most one value.
- "Workflow ... does not exist ... or is missing a single replaceable input dataset": Select
a workflow in the same project with exactly one variable input.
## Typical Use Cases
- Per-record processing pipelines (e.g., validation, enrichment, export).
- Batch operations that require complex per-entity logic encapsulated in a workflow.
- Quality checks where each entity must pass through a dedicated validation workflow.
"""
@dataclass
class WorkflowExecution:
"""Represents the status of a concrete workflow execution"""
task_id: str
project_id: str
entity: Entity
schema: EntitySchema
instance_id: str | None = None
activity_id: str | None = None
status: str = "QUEUED"
is_running: bool = False
raw: dict[str, str] | None = None
execution_context: ExecutionContext | None = None
logger: PluginLogger | None = None
input_mime_type: str = ""
@property
def is_finished(self) -> bool:
"""True if the workflow is finished"""
return self.status.upper() == "FINISHED"
@property
def is_queued(self) -> bool:
"""True if workflow is queued"""
return self.status.upper() == "QUEUED"
def entity_as_json_str(self) -> str:
"""Return the entity as a JSON string"""
entity_as_dict = StartWorkflow.entity_to_dict(entity=self.entity, schema=self.schema)
return json.dumps(entity_as_dict)
def start(self) -> bool:
"""Start the workflow"""
if self.logger:
self.logger.info(f"Starting workflow execution: {self.entity_as_json_str()}")
try:
if self.execution_context:
setup_cmempy_user_access(context=self.execution_context.user)
if self.schema.type_uri == FileEntitySchema().type_uri and self.input_mime_type != "":
response = execute_workflow_io(
project_name=self.project_id,
task_name=self.task_id,
input_file=self.entity.values[0][0],
input_mime_type=self.input_mime_type,
)
# workflows are NOT executed async at the moment
self.status = "FINISHED"
return True
response = get_json(
f"{config.get_di_api_endpoint()}/api/workflow/executeAsync/{self.project_id}/{self.task_id}",
headers={"Content-Type": "application/json"},
method="POST",
data=self.entity_as_json_str(),
)
except HTTPError as error:
if error.response.status_code == HTTPStatus.SERVICE_UNAVAILABLE:
# 503 - no more execution capacity > no status change
return False
raise ValueError(str(error)) from error
self.instance_id = response["instanceId"]
self.activity_id = response["activityId"]
self.update()
return True
def wait_until_finished(self) -> None:
"""Wait until the workflow is finished"""
while self.is_running:
self.update()
sleep(1)
def update(self) -> None:
"""Update the execution status"""
response = get_json(
f"{config.get_di_api_endpoint()}/workspace/activities/status",
params={
"project": self.project_id,
"task": self.task_id,
"activity": self.activity_id,
"instance": self.instance_id,
},
)
self.status = response["statusName"]
self.is_running = response["isRunning"]
self.raw = response
if self.logger:
self.logger.debug(f"Updated Status: {self!s}")
@dataclass
class WorkflowExecutionList:
"""Workflow execution status list / registry"""
statuses: list[WorkflowExecution]
context: ExecutionContext
logger: PluginLogger
def __init__(self):
self.statuses = []
def execute(self, parallel_execution: int) -> None:
"""Execute all workflow executions"""
while self.queued > 0:
while self.running < parallel_execution and self.queued > 0:
self.start_next()
self.report()
self.wait_until_finished()
self.report()
def start_next(self) -> bool:
"""Start the next workflow execution in queue"""
all_queued = [_ for _ in self.statuses if _.is_queued]
if not all_queued:
return False
next_in_queue: WorkflowExecution = all_queued[0]
return next_in_queue.start()
def wait_until_finished(self, polling_time: int = 1) -> None:
"""Wait until all running workflows are finished"""
while self.running > 0:
sleep(polling_time)
self.update_running_status()
def update_running_status(self) -> None:
"""Update status of running workflows"""
for _ in self.statuses:
if _.is_running:
_.update()
def append(self, status: WorkflowExecution) -> None:
"""Append a workflow execution to the list"""
self.statuses.append(status)
def report(self) -> None:
"""Report workflow statuses to the logger and/or execution report from context"""
line = f"finished ({self.running} running, {self.queued} queued)"
self.context.report.update(
ExecutionReport(
entity_count=self.finished,
operation="start",
operation_desc=line,
)
)
self.logger.info(f"{self.finished} {line}")
@property
def running(self) -> int:
"""Returns the number of running workflows"""
return len([_ for _ in self.statuses if _.is_running])
@property
def finished(self) -> int:
"""Returns the number of finished workflows"""
return len([_ for _ in self.statuses if _.is_finished])
@property
def queued(self) -> int:
"""Returns the number of queued workflows"""
return len([_ for _ in self.statuses if _.is_queued])
@Plugin(
label="Start Workflow per Entity",
description="Loop over the output of a task and start a sub-workflow for each entity.",
documentation=DOCUMENTATION,
icon=Icon(package=__package__, file_name="loopwf.svg"),
plugin_id="cmem_plugin_loopwf-task-StartWorkflow",
parameters=[
PluginParameter(
name="workflow",
label="Workflow",
param_type=SuitableWorkflowParameterType(),
description="Which workflow do you want to start per entity.",
),
PluginParameter(
name="parallel_execution",
label="How many workflow jobs should run in parallel?",
param_type=IntParameterType(),
default_value=1,
),
PluginParameter(
name="forward_entities",
label="Forward incoming entities to the output port?",
param_type=BoolParameterType(),
default_value=False,
),
PluginParameter(
name="input_mime_type",
label="Mime-type for file by file processing (beta)",
description="When working with file entities, setting this to a proper value will send"
" the file to the"
" workflow instead of the meta-data. Examples are: 'application/x-plugin-binaryFile',"
" 'application/json', 'application/xml', 'text/csv', 'application/octet-stream' or"
" 'application/x-plugin-excel'.",
param_type=StringParameterType(),
default_value="",
advanced=True,
),
],
)
class StartWorkflow(WorkflowPlugin):
"""Start Workflow per Entity"""
context: ExecutionContext
executions: WorkflowExecutionList
def __init__(
self,
workflow: str,
parallel_execution: int = 1,
forward_entities: bool = False,
input_mime_type: str = "",
) -> None:
self.workflow = workflow
if parallel_execution < 1:
raise ValueError("parallel_execution must be >= 1")
self.parallel_execution = parallel_execution
self.forward_entities = forward_entities
self.input_mime_type = input_mime_type
self.input_ports = FixedNumberOfInputs([FlexibleSchemaPort()])
self.output_port = FlexibleSchemaPort() if forward_entities else None
self.workflows_started = 0
self.executions = WorkflowExecutionList()
def start_workflows(self, inputs: Sequence[Entities]) -> Entities:
"""Start the workflows and return output entities"""
input_entities = inputs[0].entities
schema = inputs[0].schema
self.executions.context = self.context
self.executions.logger = self.log
self.executions.report()
for entity in input_entities:
new_execution = WorkflowExecution(
task_id=self.workflow,
project_id=self.context.task.project_id(),
entity=entity,
schema=schema,
execution_context=self.context,
logger=self.log,
input_mime_type=self.input_mime_type,
)
self.log.info(f"Got new entity: {new_execution.entity_as_json_str()}")
self.executions.append(new_execution)
self.executions.report()
self.executions.execute(parallel_execution=self.parallel_execution)
# remove execution via /workflow/workflows/{project}/{task}/execution/{executionId}
return Entities(
schema=schema,
entities=iter([_.entity for _ in self.executions.statuses]),
)
def execute(
self,
inputs: Sequence[Entities],
context: ExecutionContext,
) -> Entities | None:
"""Run the workflow operator."""
self.log.info("Start execute")
self.context = context
self.validate_inputs(inputs=inputs)
self.validate_workflow(workflow=self.workflow)
output_entities = self.start_workflows(inputs=inputs)
if self.forward_entities:
self.log.info("All done ... forward entities")
return output_entities
self.log.info("All done ...")
return None
@staticmethod
def validate_inputs(inputs: Sequence[Entities]) -> None:
"""Validate inputs."""
inputs_count = len(inputs)
if inputs_count == 0:
raise exceptions.MissingInputError("Need a connected input task to get data from.")
if inputs_count > 1:
raise exceptions.TooManyInputsError("Can process a single input only.")
def validate_workflow(self, workflow: str) -> None:
"""Validate a workflow (ID)"""
current_project = self.context.task.project_id()
setup_cmempy_user_access(context=self.context.user)
suitable_workflows: dict[str, dict] = {
f"{_['id']}": _
for _ in get_workflows_io()
if self.context.task.project_id() == _["projectId"] and len(_["variableInputs"]) == 1
}
if workflow not in suitable_workflows:
raise exceptions.NoSuitableWorkflowError(
f"Workflow '{workflow}' does not exist in project '{current_project}'"
" or is missing a single replaceable input dataset."
)
self.log.info(str(suitable_workflows))
@staticmethod
def entity_to_dict(entity: Entity, schema: EntitySchema) -> dict:
"""Convert an entity to a dictionary, using the schema"""
path: EntityPath
values: Sequence[str]
entity_dict = {}
for path, values in zip(schema.paths, entity.values, strict=True):
if len(values) > 1:
raise exceptions.MultipleValuesError(f"Multiple values for entity path {path.path}")
entity_dict[path.path] = values[0] if len(values) == 1 else ""
return entity_dict