-
-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathcontroller.py
More file actions
265 lines (224 loc) · 11.1 KB
/
controller.py
File metadata and controls
265 lines (224 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import logging
import math
from time import sleep
from morango.constants import transfer_stages
from morango.constants import transfer_statuses
from morango.registry import session_middleware
from morango.sync.operations import _deserialize_from_store
from morango.sync.operations import OperationLogger
from morango.sync.stream.serialize import serialize_into_store
from morango.sync.utils import SyncSignalGroup
from morango.utils import _assert
logger = logging.getLogger(__name__)
class MorangoProfileController(object):
def __init__(self, profile):
_assert(profile, "profile needs to be defined.")
self.profile = profile
def serialize_into_store(self, sync_filter=None):
"""
Takes data from app layer and serializes the models into the store.
"""
with OperationLogger("Serializing records", "Serialization complete"):
serialize_into_store(self.profile, sync_filter=sync_filter)
def deserialize_from_store(self, skip_erroring=False, sync_filter=None):
"""
Takes data from the store and integrates into the application.
"""
with OperationLogger("Deserializing records", "Deserialization complete"):
# we first serialize to avoid deserialization merge conflicts
serialize_into_store(self.profile, sync_filter=sync_filter)
_deserialize_from_store(
self.profile, filter=sync_filter, skip_erroring=skip_erroring
)
def create_network_connection(self, base_url, **kwargs):
from morango.sync.syncsession import NetworkSyncConnection
kwargs.update(base_url=base_url)
return NetworkSyncConnection(**kwargs)
def create_disk_connection(path):
raise NotImplementedError("Coming soon...")
class SessionControllerSignals(object):
__slots__ = transfer_stages.ALL
def __init__(self):
"""
Initializes signal group for each transfer stage
"""
for stage in transfer_stages.ALL:
setattr(self, stage, SyncSignalGroup(context=None))
def connect(self, handler):
"""
Connects handler to every stage's signal
:param handler: callable
"""
for stage in transfer_stages.ALL:
signal = getattr(self, stage)
signal.connect(handler)
class SessionController(object):
"""
Controller class that is used to execute transfer operations, like queuing and serializing,
but does so through the middleware registry, which allows customization of how those transfer
stage operations are handled
"""
__slots__ = (
"middleware",
"signals",
"context",
)
def __init__(self, middleware, signals, context=None):
"""
:type middleware: morango.registry.SessionMiddlewareRegistry|list
:type signals: SessionControllerSignals
:type context: morango.sync.context.SessionContext|None
"""
self.middleware = middleware
self.signals = signals
self.context = context
@classmethod
def build(cls, middleware=None, signals=None, context=None):
"""
Factory method that instantiates the `SessionController` with the specified context and
the global middleware registry `session_middleware`
:type middleware: morango.registry.SessionMiddlewareRegistry|list|None
:type signals: SessionControllerSignals|none
:type context: morango.sync.context.SessionContext|None
:return: A new transfer controller
:rtype: SessionController
"""
middleware = middleware or session_middleware
signals = signals or SessionControllerSignals()
return SessionController(middleware, signals, context=context)
def proceed_to(self, target_stage, context=None):
"""
Calls middleware that operates on stages between the current stage and the `target_stage`.
The middleware are called incrementally, but in order to proceed to the next stage, the
middleware must return a complete status. If the middleware does not return a complete
status, that status is returned indicating that the method call has not reached
`target_stage`. Therefore middleware can perform operations asynchronously and this can be
repeatedly called to move forward through the transfer stages and their operations
When invoking the middleware, if the status result is:
PENDING: The controller will continue to invoke the middleware again when this method
is called repeatedly, until the status result changes
STARTED: The controller will not invoke any middleware until the status changes,
and assumes the "started" operation will update the state itself
COMPLETED: The controller will proceed to invoke the middleware for the next stage
ERRORED: The controller will not invoke any middleware until the status changes,
which would require codified resolution of the error outside of the controller
:param target_stage: transfer_stage.* - The transfer stage to proceed to
:type target_stage: str
:param context: Override controller context, or provide it if missing
:type context: morango.sync.context.SessionContext|None
:return: transfer_status.* - The status of proceeding to that stage
:rtype: str
"""
if context is None:
context = self.context
if context is None:
raise ValueError("Controller is missing required context object")
target_stage = transfer_stages.stage(target_stage)
# we can't 'proceed' to a stage we've already passed
current_stage = transfer_stages.stage(context.stage)
if current_stage > target_stage:
return transfer_statuses.COMPLETED
# See comments above, any of these statuses mean a no-op for proceeding
if context.stage_status in (
transfer_statuses.STARTED,
transfer_statuses.ERRORED,
):
return context.stage_status
result = False
# inside "session_middleware"
for middleware in self.middleware:
middleware_stage = transfer_stages.stage(middleware.related_stage)
# break when we find middleware beyond proceed-to stage
if middleware_stage > target_stage:
break
# execute middleware, up to and including the requested stage
elif middleware_stage > current_stage or (
context.stage_status == transfer_statuses.PENDING
and middleware_stage == current_stage
):
# if the result is not completed status, then break because that means we can't
# proceed to the next stage (yet)
result = self._invoke_middleware(context, middleware)
# update local stage variable after completion
current_stage = transfer_stages.stage(context.stage)
if result != transfer_statuses.COMPLETED:
break
# since the middleware must handle our request, or throw an unimplemented error, this
# should always be a non-False status
return result
def proceed_to_and_wait_for(
self, target_stage, context=None, max_interval=None, callback=None
):
"""
Same as `proceed_to` but waits for a finished status to be returned by sleeping between
calls to `proceed_to` if status is not complete
:param target_stage: transfer_stage.* - The transfer stage to proceed to
:type target_stage: str
:param context: Override controller context, or provide it if missing
:type context: morango.sync.context.SessionContext|None
:param max_interval: The max time, in seconds, between repeat calls to `.proceed_to`
:param callback: A callable to invoke after every attempt
:return: transfer_status.* - The status of proceeding to that stage,
which should be `ERRORED` or `COMPLETE`
:rtype: str
"""
result = transfer_statuses.PENDING
tries = 0
context = context or self.context
max_interval = max_interval or context.max_backoff_interval
# solve for the number of tries at which our sleep time will always be max_interval
max_interval_tries = math.ceil(math.log(max_interval / 0.3 + 1) / math.log(2))
while result not in transfer_statuses.FINISHED_STATES:
if tries > 0:
# exponential backoff up to max_interval
if tries >= max_interval_tries:
sleep(max_interval)
else:
sleep(0.3 * (2**tries - 1))
result = self.proceed_to(target_stage, context=context)
tries += 1
if callable(callback):
callback()
return result
def _invoke_middleware(self, context, middleware):
"""
Invokes middleware, with logging if enabled, and handles updating the transfer session state
:param context: The context to invoke the middleware with
:type context: morango.sync.context.SessionContext
:type middleware: morango.registry.SessionMiddlewareOperations
:return: transfer_status.* - The result of invoking the middleware
:rtype: str
"""
stage = middleware.related_stage
signal = getattr(self.signals, stage)
at_stage = context.stage == stage
prepared_context = None
try:
context.update(stage=stage, stage_status=transfer_statuses.PENDING)
# we'll use the prepared context for passing to the middleware and any signal handlers
prepared_context = context.prepare()
# only fire "started" when we first try to invoke the stage
# NOTE: this means that signals.started is not equivalent to transfer_stage.STARTED
if not at_stage:
signal.started.fire(context=prepared_context)
# invoke the middleware with the prepared context
result = middleware(prepared_context)
# return the prepared context back
context.join(prepared_context)
# don't update stage result if context's stage was updated during operation
if context.stage == stage:
context.update(stage_status=result)
# fire signals based off middleware invocation result; the progress signal if incomplete
if result == transfer_statuses.COMPLETED:
signal.completed.fire(context=prepared_context)
else:
signal.in_progress.fire(context=prepared_context)
# context should take precedence over result, and was likely updated
return context.stage_status
except Exception as e:
# always log the error itself
logger.error(e)
context.update(stage_status=transfer_statuses.ERRORED, error=e)
# fire completed signal, after context update. handlers can use context to detect error
signal.completed.fire(context=prepared_context or context)
return transfer_statuses.ERRORED