-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathsync.py
More file actions
163 lines (130 loc) · 6.35 KB
/
sync.py
File metadata and controls
163 lines (130 loc) · 6.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from typing import TYPE_CHECKING
from ..event_data import EventData
from ..event_data import TriggerData
from ..exceptions import TransitionNotAllowed
from .base import BaseEngine
if TYPE_CHECKING:
from ..transition import Transition
class SyncEngine(BaseEngine):
def start(self):
super().start()
self.activate_initial_state()
def activate_initial_state(self):
"""
Activate the initial state.
Called automatically on state machine creation from sync code, but in
async code, the user must call this method explicitly.
Given how async works on python, there's no built-in way to activate the initial state that
may depend on async code from the StateMachine.__init__ method.
"""
return self.processing_loop()
def processing_loop(self):
"""Process event triggers.
The simplest implementation is the non-RTC (synchronous),
where the trigger will be run immediately and the result collected as the return.
.. note::
While processing the trigger, if others events are generated, they
will also be processed immediately, so a "nested" behavior happens.
If the machine is on ``rtc`` model (queued), the event is put on a queue, and only the
first event will have the result collected.
.. note::
While processing the queue items, if others events are generated, they
will be processed sequentially (and not nested).
"""
if not self._rtc:
# The machine is in "synchronous" mode
trigger_data = self._external_queue.popleft()
return self._trigger(trigger_data)
# We make sure that only the first event enters the processing critical section,
# next events will only be put on the queue and processed by the same loop.
if not self._processing.acquire(blocking=False):
return None
# We will collect the first result as the processing result to keep backwards compatibility
# so we need to use a sentinel object instead of `None` because the first result may
# be also `None`, and on this case the `first_result` may be overridden by another result.
first_result = self._sentinel
try:
# Execute the triggers in the queue in FIFO order until the queue is empty
while self._external_queue:
trigger_data = self._external_queue.popleft()
try:
result = self._trigger(trigger_data)
if first_result is self._sentinel:
first_result = result
except Exception:
# Whe clear the queue as we don't have an expected behavior
# and cannot keep processing
self._external_queue.clear()
raise
finally:
self._processing.release()
return first_result if first_result is not self._sentinel else None
def _trigger(self, trigger_data: TriggerData):
executed = False
if trigger_data.event == "__initial__":
transition = self._initial_transition(trigger_data)
self._activate(trigger_data, transition)
return self._sentinel
state = self.sm.current_state
# Collect all matching transitions
matching_transitions = [
t for t in state.transitions if t.match(trigger_data.event)
]
if not matching_transitions:
if not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(trigger_data.event, state)
return None
# Check if any transition has a positive weight
weighted_transitions = [
t for t in matching_transitions if t.weight is not None and t.weight > 0
]
# If we have weighted transitions, select one randomly
if weighted_transitions:
weights = [t.weight for t in weighted_transitions]
selected_transition = self.sm._random.choices(weighted_transitions, weights=weights, k=1)[0]
executed, result = self._activate(trigger_data, selected_transition)
if executed:
return result
# If the selected transition failed its conditions, try others
for transition in weighted_transitions:
if transition == selected_transition:
continue
executed, result = self._activate(trigger_data, transition)
if executed:
return result
if not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(trigger_data.event, state)
return None
# Otherwise, use first-match behavior (backward compatible)
for transition in matching_transitions:
executed, result = self._activate(trigger_data, transition)
if not executed:
continue
break
else:
if not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(trigger_data.event, state)
return result if executed else None
def _activate(self, trigger_data: TriggerData, transition: "Transition"):
event_data = EventData(trigger_data=trigger_data, transition=transition)
args, kwargs = event_data.args, event_data.extended_kwargs
self.sm._callbacks.call(transition.validators.key, *args, **kwargs)
if not self.sm._callbacks.all(transition.cond.key, *args, **kwargs):
return False, None
source = transition.source
target = transition.target
result = self.sm._callbacks.call(transition.before.key, *args, **kwargs)
if source is not None and not transition.internal:
self.sm._callbacks.call(source.exit.key, *args, **kwargs)
result += self.sm._callbacks.call(transition.on.key, *args, **kwargs)
self.sm.current_state = target
event_data.state = target
kwargs["state"] = target
if not transition.internal:
self.sm._callbacks.call(target.enter.key, *args, **kwargs)
self.sm._callbacks.call(transition.after.key, *args, **kwargs)
if len(result) == 0:
result = None
elif len(result) == 1:
result = result[0]
return True, result