forked from serverlessworkflow/sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcallback_state.py
More file actions
79 lines (65 loc) · 3.15 KB
/
callback_state.py
File metadata and controls
79 lines (65 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from __future__ import annotations
import copy
from serverlessworkflow.sdk.action import Action
from serverlessworkflow.sdk.callback_state_timeout import CallbackStateTimeOut
from serverlessworkflow.sdk.end import End
from serverlessworkflow.sdk.error import Error
from serverlessworkflow.sdk.event_data_filter import EventDataFilter
from serverlessworkflow.sdk.metadata import Metadata
from serverlessworkflow.sdk.state import State
from serverlessworkflow.sdk.state_data_filter import StateDataFilter
from serverlessworkflow.sdk.swf_base import ComplexTypeOf, ArrayTypeOf, HydratableParameter, SimpleTypeOf, \
UnionTypeOf, SwfBase
from serverlessworkflow.sdk.transition import Transition
class CallbackState(State, SwfBase):
id: str = None
name: str = None
type: str = None
action: Action = None
eventRef: str = None
timeouts: CallbackStateTimeOut = None
eventDataFilter: EventDataFilter = None
stateDataFilter: StateDataFilter = None
onErrors: [Error] = None
transition: (str | Transition) = None
end: (bool | End) = None
compensatedBy: str = None
usedForCompensation: bool = None
metadata: Metadata = None
def __init__(self,
id: str = None,
name: str = None,
type: str = None,
action: Action = None,
eventRef: str = None,
timeouts: CallbackStateTimeOut = None,
eventDataFilter: EventDataFilter = None,
stateDataFilter: StateDataFilter = None,
onErrors: [Error] = None,
transition: (str | Transition) = None,
end: (bool | End) = None,
compensatedBy: str = None,
usedForCompensation: bool = None,
metadata: Metadata = None,
**kwargs):
_default_values = {'type': 'callback', 'usedForCompensation': False, }
SwfBase.__init__(self, locals(), kwargs, CallbackState.f_hydration, _default_values)
@staticmethod
def f_hydration(p_key, p_value):
if p_key == 'action':
return HydratableParameter(value=p_value).hydrateAs(ComplexTypeOf(Action))
if p_key == 'timeouts':
return HydratableParameter(value=p_value).hydrateAs(ComplexTypeOf(CallbackStateTimeOut))
if p_key == 'eventDataFilter':
return HydratableParameter(value=p_value).hydrateAs(ComplexTypeOf(EventDataFilter))
if p_key == 'stateDataFilter':
return HydratableParameter(value=p_value).hydrateAs(ComplexTypeOf(StateDataFilter))
if p_key == 'onErrors':
return HydratableParameter(value=p_value).hydrateAs(ArrayTypeOf(Error))
if p_key == 'transition':
return HydratableParameter(value=p_value).hydrateAs(UnionTypeOf([SimpleTypeOf(str),
ComplexTypeOf(Transition)]))
if p_key == 'end':
return HydratableParameter(value=p_value).hydrateAs(UnionTypeOf([SimpleTypeOf(bool),
ComplexTypeOf(End)]))
return copy.deepcopy(p_value)