Skip to content

Commit b74f896

Browse files
authored
Merge pull request #284 from bjester/op-mod-con
Allow sync operations to modify the sync filter
2 parents 92278a5 + 4f04ab8 commit b74f896

9 files changed

Lines changed: 381 additions & 73 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ Pipfile
9696

9797
# Jetbrains IDE
9898
.idea
99+
/*.iml
99100

100101
*.db
101102

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
List of the most important changes for each release.
44

5+
## 0.8.6
6+
- Allows sync operations to modify the context, particularly the sync filter, as long as the original is a subset of the new filter
7+
- Deprecates usage of `Filter` for parameter replacement. Use `Filter.from_template` instead
8+
- Adds additional utilities to `Filter` class and defensive logic for various scenarios
9+
510
## 0.8.5
611
- Prevents MPTT corruption that occurs with concurrent certificate creation
712

morango/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.8.5"
1+
__version__ = "0.8.6"

morango/models/certificates.py

Lines changed: 103 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
and a ``public_key`` used for verifying that a certificate(s) was properly signed.
55
"""
66
import json
7-
import string
87
import logging
8+
import string
9+
from contextlib import contextmanager
910

1011
import mptt.models
1112
from django.core.management import call_command
13+
from django.db import connection
1214
from django.db import models
1315
from django.db import transaction
16+
from django.db.utils import OperationalError
1417
from django.utils import timezone
1518

1619
from .fields.crypto import Key
@@ -24,11 +27,9 @@
2427
from morango.errors import CertificateSignatureInvalid
2528
from morango.errors import NonceDoesNotExist
2629
from morango.errors import NonceExpired
27-
from morango.utils import _assert
28-
from django.db import transaction, connection
2930
from morango.sync.backends.utils import load_backend
30-
from contextlib import contextmanager
31-
from django.db.utils import OperationalError
31+
from morango.utils import _assert
32+
3233

3334
class Certificate(mptt.models.MPTTModel, UUIDModelMixin):
3435

@@ -360,45 +361,89 @@ def get_description(self, params):
360361

361362

362363
class Filter(object):
363-
def __init__(self, template, params={}):
364-
# ensure params have been deserialized
365-
if isinstance(params, str):
366-
params = json.loads(params)
367-
self._template = template
368-
self._params = params
369-
self._filter_string = string.Template(template).safe_substitute(params)
370-
self._filter_tuple = tuple(self._filter_string.split()) or ("",)
364+
def __init__(self, filter_str, params=None):
365+
"""
366+
:param filter_str: The partition filter string, which may have multiple separated by newlines
367+
:type filter_str: str
368+
:param params: DEPRECATED: USE Filter.from_template() INSTEAD
369+
:type params: dict|str
370+
"""
371+
if params is not None:
372+
logging.warning("DEPRECATED: Constructing a filter with a template and params is deprecated. Use Filter.from_template() instead")
373+
filter_str = str(Filter.from_template(filter_str, params=params))
374+
375+
self._filter_tuple = tuple(filter_str.split()) or ("",)
371376

372377
def is_subset_of(self, other):
373-
for partition in self._filter_tuple:
374-
if not partition.startswith(other._filter_tuple):
378+
"""
379+
:param other: The other Filter
380+
:type other: Filter
381+
:return: A boolean on whether this Filter is captured within the other Filter
382+
:rtype: bool
383+
"""
384+
for partition in self:
385+
if not other.contains_partition(partition):
375386
return False
376387
return True
377388

378389
def contains_partition(self, partition):
390+
"""Returns True if the partition starts with as least one of the partitions in this Filter"""
379391
return partition.startswith(self._filter_tuple)
380392

393+
def contains_exact_partition(self, partition):
394+
"""Returns True if the partition exactly matches one of the partitions in this Filter"""
395+
return partition in self._filter_tuple
396+
397+
def copy(self):
398+
return Filter(str(self))
399+
381400
def __le__(self, other):
401+
"""Returns True if this Filter is a subset of the other"""
382402
return self.is_subset_of(other)
383403

384404
def __eq__(self, other):
405+
"""Returns True if this Filter has exactly the same partitions as the other"""
385406
if other is None:
386407
return False
387-
for partition in self._filter_tuple:
388-
if partition not in other._filter_tuple:
408+
for partition in self:
409+
if not other.contains_exact_partition(partition):
389410
return False
390-
for partition in other._filter_tuple:
391-
if partition not in self._filter_tuple:
411+
for partition in other:
412+
if not self.contains_exact_partition(partition):
392413
return False
393414
return True
394415

395416
def __contains__(self, partition):
417+
"""
418+
Performs a 'startswith' comparison on the partition, determining whether it matches or
419+
is a subset of any partition in this Filter
420+
421+
:param partition: str
422+
:return: A boolean
423+
:rtype: bool
424+
"""
396425
return self.contains_partition(partition)
397426

398427
def __add__(self, other):
399-
return Filter(self._filter_string + "\n" + other._filter_string)
428+
"""
429+
The Filter's addition operator overload
430+
:param other: Filter or None
431+
:type other: Filter|None
432+
:return: The combined Filter
433+
:rtype: Filter
434+
"""
435+
if other is None:
436+
return self
437+
# create a list of partition filters, deduplicating them between the two filter objects
438+
partitions = []
439+
partitions.extend(p for p in self if p)
440+
partitions.extend(p for p in other if p and p not in partitions)
441+
return Filter("\n".join(partitions))
400442

401443
def __iter__(self):
444+
"""
445+
:rtype: tuple[str]
446+
"""
402447
return iter(self._filter_tuple)
403448

404449
def __str__(self):
@@ -407,13 +452,48 @@ def __str__(self):
407452
def __len__(self):
408453
return len(self._filter_tuple)
409454

455+
@classmethod
456+
def add(cls, filter_a, filter_b):
457+
"""
458+
The Filter's addition operator overload is already defensive against None being the
459+
right-hand operand, but this method is defensive against None being the left-hand operand
460+
461+
:param filter_a: A Filter or None
462+
:type filter_a: Filter|None
463+
:param filter_b: A Filter or None
464+
:type filter_b: Filter|None
465+
:return: The combined Filter or None
466+
:rtype: Filter|None
467+
"""
468+
if filter_a is None:
469+
return filter_b
470+
return filter_a + filter_b
471+
472+
@classmethod
473+
def from_template(cls, template, params=None):
474+
"""
475+
Create a filter from a string template, which may have params that will be replaced with
476+
values passed to `params`
477+
478+
:param template: The partition filter template
479+
:type template: str
480+
:param params: The param dictionary or JSON object string
481+
:type params: dict|str
482+
:return: The filter with params replaced
483+
:rtype: Filter
484+
"""
485+
if isinstance(params, str):
486+
params = json.loads(params)
487+
params = params or {}
488+
return Filter(string.Template(template).safe_substitute(params))
489+
410490

411491
class Scope(object):
412492
def __init__(self, definition, params):
413493
# turn the scope definition filter templates into Filter objects
414-
rw_filter = Filter(definition.read_write_filter_template, params)
415-
self.read_filter = rw_filter + Filter(definition.read_filter_template, params)
416-
self.write_filter = rw_filter + Filter(definition.write_filter_template, params)
494+
rw_filter = Filter.from_template(definition.read_write_filter_template, params)
495+
self.read_filter = rw_filter + Filter.from_template(definition.read_filter_template, params)
496+
self.write_filter = rw_filter + Filter.from_template(definition.write_filter_template, params)
417497

418498
def is_subset_of(self, other):
419499
if not self.read_filter.is_subset_of(other.read_filter):

morango/sync/context.py

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ def prepare(self):
6363
"""
6464
return self
6565

66+
def join(self, context):
67+
"""
68+
Perform any processing of session context after passing it to the middleware, which will
69+
receive the result returned by `prepare` after that has happened
70+
"""
71+
pass
72+
6673
def update(
6774
self,
6875
transfer_session=None,
@@ -83,7 +90,7 @@ def update(
8390
:type capabilities: str[]|None
8491
:type error: BaseException|None
8592
"""
86-
if transfer_session and self.transfer_session:
93+
if transfer_session and self.transfer_session and transfer_session.id != self.transfer_session.id:
8794
raise MorangoContextUpdateError("Transfer session already exists")
8895
elif (
8996
transfer_session
@@ -92,14 +99,18 @@ def update(
9299
):
93100
raise MorangoContextUpdateError("Sync session mismatch")
94101

95-
if sync_filter and self.filter:
96-
raise MorangoContextUpdateError("Filter already exists")
102+
if sync_filter and self.filter and sync_filter != self.filter:
103+
if not self.filter.is_subset_of(sync_filter):
104+
raise MorangoContextUpdateError("The existing filter must be a subset of the new filter")
105+
if transfer_stages.stage(self.stage) > transfer_stages.stage(transfer_stages.INITIALIZING):
106+
raise MorangoContextUpdateError("Cannot update filter after initializing stage")
97107

98108
if is_push is not None and self.is_push is not None:
99109
raise MorangoContextUpdateError("Push/pull method already exists")
100110

101111
self.transfer_session = transfer_session or self.transfer_session
102-
self.filter = sync_filter or self.filter
112+
if sync_filter or self.filter:
113+
self.filter = Filter.add(self.filter, sync_filter)
103114
self.is_push = is_push if is_push is not None else self.is_push
104115
self.capabilities = set(capabilities or self.capabilities) & CAPABILITIES
105116
self.update_state(stage=stage, stage_status=stage_status)
@@ -402,50 +413,47 @@ def prepare(self):
402413
"""
403414
return self.children[self._counter % len(self.children)]
404415

416+
def join(self, context):
417+
"""
418+
This updates some context attributes that really only make sense logically for what an
419+
operation might change during a sync
420+
421+
:param context: The context that was returned previously from `prepare`
422+
:type context: SessionContext
423+
"""
424+
updates = {}
425+
if not self.transfer_session and context.transfer_session:
426+
# if the transfer session is being resumed, we'd detect a different stage here,
427+
# and thus we reset the counter, so we can be sure to start fresh at that stage
428+
# on the next invocation of the middleware
429+
if (
430+
context.transfer_session.transfer_stage
431+
and self._stage
432+
and context.transfer_session.transfer_stage != self._stage
433+
):
434+
self._counter = 0
435+
updates.update(
436+
stage=context.transfer_session.transfer_stage,
437+
stage_status=transfer_statuses.PENDING,
438+
)
439+
updates.update(transfer_session=context.transfer_session)
440+
if self.filter != context.filter:
441+
updates.update(sync_filter=context.filter)
442+
if self.capabilities != context.capabilities:
443+
updates.update(capabilities=context.capabilities)
444+
if self.error != context.error:
445+
updates.update(error=context.error)
446+
if updates:
447+
self.update(**updates)
448+
405449
def update(self, stage=None, stage_status=None, **kwargs):
406-
"""
407-
Updates the context object and its state
408-
:param stage: The str transfer stage
409-
:param stage_status: The str transfer stage status
410-
:param kwargs: Other arguments to update the context with
411-
"""
412450
# update ourselves, but exclude stage and stage_status
413451
super(CompositeSessionContext, self).update(**kwargs)
414452
# update children contexts directly, but exclude stage and stage_status
415453
self._update_attrs(**kwargs)
416454
# handle state changes after updating children
417455
self.update_state(stage=stage, stage_status=stage_status)
418456

419-
# During the initializing stage, we want to make sure to synchronize the transfer session
420-
# object between the composite and children contexts, using whatever child context's
421-
# transfer session object that was updated on the context during initialization
422-
current_stage = stage or self._stage
423-
if not self.transfer_session and current_stage == transfer_stages.INITIALIZING:
424-
try:
425-
transfer_session = next(
426-
c.transfer_session for c in self.children if c.transfer_session
427-
)
428-
# prepare an updates dictionary, so we can update everything at once
429-
updates = dict(transfer_session=transfer_session)
430-
431-
# if the transfer session is being resumed, we'd detect a different stage here,
432-
# and thus we reset the counter, so we can be sure to start fresh at that stage
433-
# on the next invocation of the middleware
434-
if (
435-
transfer_session.transfer_stage
436-
and transfer_session.transfer_stage != current_stage
437-
):
438-
self._counter = 0
439-
updates.update(
440-
stage=transfer_session.transfer_stage,
441-
stage_status=transfer_statuses.PENDING,
442-
)
443-
444-
# recurse into update with transfer session and possibly state updates too
445-
self.update(**updates)
446-
except StopIteration:
447-
pass
448-
449457
def update_state(self, stage=None, stage_status=None):
450458
"""
451459
Updates the state of the transfer

morango/sync/controller.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ def _invoke_middleware(self, context, middleware):
253253
# invoke the middleware with the prepared context
254254
result = middleware(prepared_context)
255255

256+
# return the prepared context back
257+
context.join(prepared_context)
258+
256259
# don't update stage result if context's stage was updated during operation
257260
if context.stage == stage:
258261
context.update(stage_status=result)

0 commit comments

Comments
 (0)