This repository was archived by the owner on Apr 30, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils_service.py
More file actions
110 lines (89 loc) · 3.76 KB
/
utils_service.py
File metadata and controls
110 lines (89 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from typing import Any
from pydantic import BaseModel
from pydantic import Extra
from pydantic.types import StrictInt
from pydantic.types import StrictStr
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.type_aliases import DictAny
from frinx.common.type_aliases import ListAny
from frinx.common.type_aliases import ListStr
from frinx.common.util import json_parse
from frinx.common.util import snake_to_camel_case
from frinx.common.util import validate_structure
from frinx.common.worker.service import ServiceWorkersImpl
from frinx.common.worker.task_def import TaskDefinition
from frinx.common.worker.task_def import TaskInput
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from frinx.common.worker.worker import WorkerImpl
# TODO: move to other file, e.g. local/util.py
class DynamicTask(BaseModel):
class _SubWorkflowParam(BaseModel):
name: StrictStr
version: StrictInt
class Config: # type: ignore[pydantic-alias]
alias_generator = snake_to_camel_case
extra = Extra.forbid
name: StrictStr
task_reference_name: StrictStr
type: StrictStr
sub_workflow_param: _SubWorkflowParam
class UtilsService(ServiceWorkersImpl):
class JSONParse(WorkerImpl):
class WorkerDefinition(TaskDefinition):
name: str = 'JSON_parse'
description: str = 'Returns object from JSON or errors if occurs.'
labels: ListStr = ['UTILS']
class WorkerInput(TaskInput):
input: str
class WorkerOutput(TaskOutput):
result: DictAny | ListAny | None
errors: ListStr | None
def execute(self, worker_input: WorkerInput) -> TaskResult[Any]:
result, errors = json_parse(input=worker_input.input)
return TaskResult(
status=TaskResultStatus.COMPLETED,
output=self.WorkerOutput(
result=result,
errors=errors or None
)
)
class ForkJoinInputValidator(WorkerImpl):
class WorkerDefinition(TaskDefinition):
name: str = 'fork_join_input_validator'
description: str = 'Input validator for Dynamic_Fork / 1.'
labels: ListStr = ['UTILS']
class WorkerInput(TaskInput):
dynamic_tasks: str
expected_type: str
expected_name: str
dynamic_tasks_input: str
class WorkerOutput(TaskOutput):
result: DictAny | ListAny | None
errors: ListStr | None
def execute(self, worker_input: WorkerInput) -> TaskResult[Any]:
# TODO: validate also dynamic_tasks_input
dynamic_tasks, errors = json_parse(dynamic_tasks=worker_input.dynamic_tasks)
dynamic_tasks_input, errors = json_parse(
errors, dynamic_tasks_input=worker_input.dynamic_tasks_input)
if dynamic_tasks:
for idx, dyn_task in enumerate(dynamic_tasks):
errors = validate_structure(
dyn_task, DynamicTask,
properties={
'type': worker_input.expected_type,
'name': worker_input.expected_name
},
errors=errors,
idx=idx
)
return TaskResult(
status=TaskResultStatus.COMPLETED,
output=self.WorkerOutput(
result={
'dynamic_tasks': dynamic_tasks,
'dynamic_tasks_input': dynamic_tasks_input
} if not errors else None,
errors=errors or None
)
)