From 82807177ecf5900fafd25c354e549170d01f5a89 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:36:32 +0000 Subject: [PATCH 1/5] Initial plan From 270885971e5b83029ee1b72acc8cba295cc127cb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:46:01 +0000 Subject: [PATCH 2/5] Implement nested field encryption for dict fields Co-authored-by: jcarlosdev <8920902+jcarlosdev@users.noreply.github.com> --- ming/__init__.py | 1 + ming/encryption.py | 171 ++++++++++++++++++++++++++++++++++++++++++++- ming/metadata.py | 15 +++- 3 files changed, 185 insertions(+), 2 deletions(-) diff --git a/ming/__init__.py b/ming/__init__.py index 914efd3..62b4600 100644 --- a/ming/__init__.py +++ b/ming/__init__.py @@ -7,6 +7,7 @@ from ming.version import __version__, __version_info__ from ming.config import configure from ming.datastore import create_engine, create_datastore +from ming.encryption import EncryptedObject # Re-export direction keys ASCENDING = pymongo.ASCENDING diff --git a/ming/encryption.py b/ming/encryption.py index aec597f..0b492cc 100644 --- a/ming/encryption.py +++ b/ming/encryption.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, TypeVar, Generic +from typing import TYPE_CHECKING, TypeVar, Generic, Any from ming.utils import classproperty import ming.schema @@ -87,6 +87,131 @@ def key_vault_namespace(self) -> str: T = TypeVar('T') +class EncryptedObject(dict): + """A dict-like wrapper that handles encryption/decryption for nested fields. + + This class wraps a regular dict and provides transparent encryption/decryption + when accessing fields that have _encrypted counterparts. + """ + + def __init__(self, data: dict, encr_func, decr_func, field_schema: dict = None): + """ + :param data: The underlying dict data + :param encr_func: Function to encrypt data (str -> bytes) + :param decr_func: Function to decrypt data (bytes -> str) + :param field_schema: Dict mapping field names to their schemas (for nested dicts) + """ + super().__init__(data) + self._encr_func = encr_func + self._decr_func = decr_func + self._field_schema = field_schema or {} + + # Wrap any nested dicts that have encrypted fields + self._wrap_nested_dicts() + + def _wrap_nested_dicts(self): + """Wrap nested dicts with EncryptedObject if they contain encrypted fields.""" + for key, value in list(self.items()): + if isinstance(value, dict) and not isinstance(value, EncryptedObject): + # Check if this dict has any encrypted fields + if self._has_encrypted_fields(value): + nested_schema = self._field_schema.get(key, {}) + self[key] = EncryptedObject(value, self._encr_func, self._decr_func, nested_schema) + + def _has_encrypted_fields(self, d: dict) -> bool: + """Check if a dict has any fields ending with _encrypted.""" + return any(k.endswith('_encrypted') for k in d.keys()) + + def _get_encrypted_field_name(self, key: str) -> str: + """Get the encrypted field name for a decrypted field.""" + return f"{key}_encrypted" + + def _is_encrypted_field(self, key: str) -> bool: + """Check if a field is an encrypted field (ends with _encrypted).""" + return key.endswith('_encrypted') + + def _get_decrypted_field_name(self, key: str) -> str: + """Get the decrypted field name from an encrypted field.""" + if key.endswith('_encrypted'): + return key[:-10] # Remove '_encrypted' suffix + return key + + def __getitem__(self, key: str) -> Any: + """Get item with automatic decryption if accessing a decrypted field.""" + # If accessing an encrypted field directly, return as-is + if self._is_encrypted_field(key): + return super().__getitem__(key) + + # Check if there's an encrypted counterpart + encrypted_key = self._get_encrypted_field_name(key) + if encrypted_key in self: + # This is a decrypted field - decrypt the encrypted value + encrypted_value = super().__getitem__(encrypted_key) + return self._decr_func(encrypted_value) + + # Regular field access + value = super().__getitem__(key) + + # If the value is a dict with encrypted fields, wrap it + if isinstance(value, dict) and not isinstance(value, EncryptedObject): + if self._has_encrypted_fields(value): + nested_schema = self._field_schema.get(key, {}) + value = EncryptedObject(value, self._encr_func, self._decr_func, nested_schema) + super().__setitem__(key, value) + + return value + + def __setitem__(self, key: str, value: Any): + """Set item with automatic encryption if setting a decrypted field.""" + # If setting an encrypted field directly, set as-is + if self._is_encrypted_field(key): + super().__setitem__(key, value) + return + + # Check if there's an encrypted counterpart + encrypted_key = self._get_encrypted_field_name(key) + if encrypted_key in self: + # This is a decrypted field - encrypt the value and store in encrypted field + if value is not None: + encrypted_value = self._encr_func(value) + super().__setitem__(encrypted_key, encrypted_value) + else: + super().__setitem__(encrypted_key, None) + # Don't store the decrypted value + return + + # Regular field - just set it + # If value is a dict with encrypted fields, wrap it + if isinstance(value, dict) and not isinstance(value, EncryptedObject): + if self._has_encrypted_fields(value): + nested_schema = self._field_schema.get(key, {}) + value = EncryptedObject(value, self._encr_func, self._decr_func, nested_schema) + + super().__setitem__(key, value) + + def get(self, key: str, default=None) -> Any: + """Get with default, handling decryption.""" + try: + return self[key] + except KeyError: + return default + + def __getattr__(self, name: str) -> Any: + """Support attribute access like obj.field_name.""" + try: + return self[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name: str, value: Any): + """Support attribute setting like obj.field_name = value.""" + # Handle internal attributes + if name.startswith('_'): + super().__setattr__(name, value) + else: + self[name] = value + + class DecryptedField(Generic[T]): def __init__(self, field_type: type[T], encrypted_field: str): @@ -130,6 +255,9 @@ class EncryptedMixin: Generally, don't use this directly, but instead call the methods on the Document/MappedClass you're working with. """ + + # Make EncryptedObject accessible as a class attribute + EncryptedObject = EncryptedObject @classproperty def _datastore(cls) -> ming.datastore.DataStore: @@ -204,10 +332,51 @@ def encrypt_some_fields(cls, data: dict) -> dict: :return: a modified copy of the ``data`` param with the currently-unencrypted-but-encryptable fields replaced with ``_encrypted`` counterparts. """ encrypted_data = data.copy() + + # Encrypt top-level decrypted fields for fld in cls.decrypted_field_names(): if fld in encrypted_data: val = encrypted_data.pop(fld) encrypted_data[f'{fld}_encrypted'] = cls.encr(val) + + # Handle nested dicts - recursively encrypt fields in dict values + for key, value in list(encrypted_data.items()): + if isinstance(value, dict) and key in cls.m.field_index: + field = cls.m.field_index[key] + if hasattr(field.schema, 'fields'): + # This is an Object schema with defined fields + encrypted_data[key] = cls._encrypt_nested_dict(value, field.schema.fields) + + return encrypted_data + + @classmethod + def _encrypt_nested_dict(cls, data: dict, schema_fields: dict) -> dict: + """Recursively encrypt fields in a nested dict based on schema. + + :param data: The dict data to encrypt + :param schema_fields: The schema fields definition for this dict level + """ + encrypted_data = data.copy() + + # Find which fields in the schema are encrypted fields (end with _encrypted) + encrypted_field_names = [k for k in schema_fields.keys() if k.endswith('_encrypted')] + + # For each encrypted field, check if we have the decrypted version in data + for encrypted_field in encrypted_field_names: + decrypted_field = encrypted_field[:-10] # Remove '_encrypted' + + if decrypted_field in encrypted_data: + # We have the decrypted version - encrypt it + val = encrypted_data.pop(decrypted_field) + encrypted_data[encrypted_field] = cls.encr(val) + + # Recursively handle nested dicts + for key, value in list(encrypted_data.items()): + if isinstance(value, dict) and key in schema_fields: + nested_schema = schema_fields[key] + if hasattr(nested_schema, 'fields'): + encrypted_data[key] = cls._encrypt_nested_dict(value, nested_schema.fields) + return encrypted_data def decrypt_some_fields(self) -> dict: diff --git a/ming/metadata.py b/ming/metadata.py index f16dbdb..631ca67 100644 --- a/ming/metadata.py +++ b/ming/metadata.py @@ -391,9 +391,22 @@ def __init__(self, field): def __get__(self, inst, cls=None): if inst is None: return self try: - return inst[self.name] + value = inst[self.name] + # If the value is a dict with encrypted fields, wrap it with EncryptedObject + if isinstance(value, dict) and not isinstance(value, EncryptedMixin.EncryptedObject): + if self._has_encrypted_fields(value): + from .encryption import EncryptedObject + # Get encryption functions from the document instance + value = EncryptedObject(value, inst.encr, inst.decr) + # Store the wrapped value back + inst[self.name] = value + return value except KeyError: raise AttributeError(self.name) + + def _has_encrypted_fields(self, d: dict) -> bool: + """Check if a dict has any fields ending with _encrypted.""" + return any(k.endswith('_encrypted') for k in d.keys()) def __set__(self, inst, value): inst[self.name] = value From ebfaa56ed993bd6a12732f782cdd4f72033df5e8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:49:27 +0000 Subject: [PATCH 3/5] Add comprehensive tests for nested dict encryption Co-authored-by: jcarlosdev <8920902+jcarlosdev@users.noreply.github.com> --- ming/metadata.py | 16 ++-- ming/tests/test_encryption.py | 136 ++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 5 deletions(-) diff --git a/ming/metadata.py b/ming/metadata.py index 631ca67..cc4ccc3 100644 --- a/ming/metadata.py +++ b/ming/metadata.py @@ -392,9 +392,9 @@ def __get__(self, inst, cls=None): if inst is None: return self try: value = inst[self.name] - # If the value is a dict with encrypted fields, wrap it with EncryptedObject + # If the value is a dict with encrypted fields (direct or nested), wrap it with EncryptedObject if isinstance(value, dict) and not isinstance(value, EncryptedMixin.EncryptedObject): - if self._has_encrypted_fields(value): + if self._has_encrypted_fields_recursive(value): from .encryption import EncryptedObject # Get encryption functions from the document instance value = EncryptedObject(value, inst.encr, inst.decr) @@ -404,9 +404,15 @@ def __get__(self, inst, cls=None): except KeyError: raise AttributeError(self.name) - def _has_encrypted_fields(self, d: dict) -> bool: - """Check if a dict has any fields ending with _encrypted.""" - return any(k.endswith('_encrypted') for k in d.keys()) + def _has_encrypted_fields_recursive(self, d: dict) -> bool: + """Check if a dict has any fields ending with _encrypted, recursively.""" + for k, v in d.items(): + if k.endswith('_encrypted'): + return True + if isinstance(v, dict): + if self._has_encrypted_fields_recursive(v): + return True + return False def __set__(self, inst, value): inst[self.name] = value diff --git a/ming/tests/test_encryption.py b/ming/tests/test_encryption.py index 2ab6e7b..aef29f9 100644 --- a/ming/tests/test_encryption.py +++ b/ming/tests/test_encryption.py @@ -281,6 +281,142 @@ class __mongometa__: self.assertEqual(doc.name, None) self.assertEqual(doc.name_encrypted, None) + def test_nested_dict_encryption(self): + """Test encryption of fields nested in dict fields.""" + from bson import ObjectId + + class UserDoc(Document): + class __mongometa__: + name = 'user_doc' + session = ming.Session.by_name('test_db') + + _id = Field(S.ObjectId) + username = Field(str) + # Dict field with encrypted nested fields + full_name = Field(dict( + first_name_encrypted=S.Binary, + last_name_encrypted=S.Binary + )) + + # Create document with unencrypted nested data + doc = UserDoc.make_encr({ + '_id': ObjectId(), + 'username': 'jdoe', + 'full_name': { + 'first_name': 'John', + 'last_name': 'Doe' + } + }) + doc.m.save() + + # Verify encrypted fields exist in storage + self.assertIn('first_name_encrypted', doc.full_name) + self.assertIn('last_name_encrypted', doc.full_name) + self.assertIsInstance(doc.full_name['first_name_encrypted'], bytes) + self.assertIsInstance(doc.full_name['last_name_encrypted'], bytes) + + # Verify decryption works through dict access + self.assertEqual(doc.full_name['first_name'], 'John') + self.assertEqual(doc.full_name['last_name'], 'Doe') + + # Verify we can set nested encrypted fields + doc.full_name['first_name'] = 'Johnny' + self.assertEqual(doc.full_name['first_name'], 'Johnny') + self.assertNotEqual(doc.full_name['first_name_encrypted'], UserDoc.encr('John')) + self.assertEqual(doc.full_name['first_name_encrypted'], UserDoc.encr('Johnny')) + + # Verify the document can be saved and retrieved + doc.m.save() + retrieved = UserDoc.m.get(_id=doc._id) + self.assertEqual(retrieved.full_name['first_name'], 'Johnny') + self.assertEqual(retrieved.full_name['last_name'], 'Doe') + + def test_nested_dict_encryption_multiple_levels(self): + """Test encryption of fields nested multiple levels deep.""" + from bson import ObjectId + + class ProfileDoc(Document): + class __mongometa__: + name = 'profile_doc' + session = ming.Session.by_name('test_db') + + _id = Field(S.ObjectId) + # Nested dict with encrypted fields + personal_info = Field(dict( + address=dict( + street_encrypted=S.Binary, + city_encrypted=S.Binary + ) + )) + + # Create document with multi-level nested unencrypted data + doc = ProfileDoc.make_encr({ + '_id': ObjectId(), + 'personal_info': { + 'address': { + 'street': '123 Main St', + 'city': 'Springfield' + } + } + }) + doc.m.save() + + # Verify nested encrypted fields exist + self.assertIn('street_encrypted', doc.personal_info['address']) + self.assertIn('city_encrypted', doc.personal_info['address']) + + # Verify decryption works at multiple levels + self.assertEqual(doc.personal_info['address']['street'], '123 Main St') + self.assertEqual(doc.personal_info['address']['city'], 'Springfield') + + # Verify setting nested values works + doc.personal_info['address']['city'] = 'Shelbyville' + self.assertEqual(doc.personal_info['address']['city'], 'Shelbyville') + + doc.m.save() + retrieved = ProfileDoc.m.get(_id=doc._id) + self.assertEqual(retrieved.personal_info['address']['city'], 'Shelbyville') + + def test_nested_dict_mixed_encrypted_and_plain_fields(self): + """Test dict with both encrypted and plain fields.""" + from bson import ObjectId + + class ContactDoc(Document): + class __mongometa__: + name = 'contact_doc' + session = ming.Session.by_name('test_db') + + _id = Field(S.ObjectId) + contact = Field(dict( + email_encrypted=S.Binary, + phone_encrypted=S.Binary, + public_name=str # This field is not encrypted + )) + + doc = ContactDoc.make_encr({ + '_id': ObjectId(), + 'contact': { + 'email': 'john@example.com', + 'phone': '555-1234', + 'public_name': 'John D.' + } + }) + doc.m.save() + + # Verify encrypted fields work + self.assertEqual(doc.contact['email'], 'john@example.com') + self.assertEqual(doc.contact['phone'], '555-1234') + + # Verify plain field works normally + self.assertEqual(doc.contact['public_name'], 'John D.') + + # Verify setting works for both types + doc.contact['email'] = 'john.doe@example.com' + doc.contact['public_name'] = 'Johnny D.' + + self.assertEqual(doc.contact['email'], 'john.doe@example.com') + self.assertEqual(doc.contact['public_name'], 'Johnny D.') + class TestDocumentEncryptionMimAutoSettings(TestDocumentEncryption): def setUp(self): # replace super() NOT using it From a1383234e4d876d6c9bc5c3320368e77f3ee4b97 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:52:18 +0000 Subject: [PATCH 4/5] Add comprehensive documentation for nested field encryption Co-authored-by: jcarlosdev <8920902+jcarlosdev@users.noreply.github.com> --- ming/encryption.py | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/ming/encryption.py b/ming/encryption.py index 0b492cc..c057d8a 100644 --- a/ming/encryption.py +++ b/ming/encryption.py @@ -92,6 +92,90 @@ class EncryptedObject(dict): This class wraps a regular dict and provides transparent encryption/decryption when accessing fields that have _encrypted counterparts. + + This is automatically applied to dict fields that contain encrypted fields, + enabling nested field-level encryption in MongoDB documents. + + **Example Usage:** + + Define a document with nested encrypted fields: + + .. code-block:: python + + class User(Document): + class __mongometa__: + name = 'user' + session = my_session + + _id = Field(schema.ObjectId) + username = Field(str) + # Dict field with encrypted nested fields + full_name = Field(dict( + first_name_encrypted=schema.Binary, + last_name_encrypted=schema.Binary + )) + + Create a document with unencrypted nested data: + + .. code-block:: python + + user = User.make_encr({ + '_id': ObjectId(), + 'username': 'jdoe', + 'full_name': { + 'first_name': 'John', + 'last_name': 'Doe' + } + }) + user.m.save() + + Access decrypted values using dict notation: + + .. code-block:: python + + # Get decrypted values + print(user.full_name['first_name']) # 'John' + print(user.full_name['last_name']) # 'Doe' + + # Set new encrypted values + user.full_name['first_name'] = 'Johnny' + user.m.save() + + **How it Works:** + + 1. When you define a dict field with fields ending in ``_encrypted`` (e.g., ``first_name_encrypted``), + the system recognizes these as encrypted fields. + + 2. When creating a document with ``make_encr()``, any nested fields that have corresponding + ``_encrypted`` fields in the schema are automatically encrypted. + + 3. When you access a field without the ``_encrypted`` suffix (e.g., ``'first_name'``), + EncryptedObject automatically decrypts the value from the ``first_name_encrypted`` field. + + 4. When you set a field without the ``_encrypted`` suffix, EncryptedObject automatically + encrypts the value and stores it in the corresponding ``_encrypted`` field. + + **Multi-level Nesting:** + + This works recursively for any level of nesting: + + .. code-block:: python + + class Profile(Document): + personal_info = Field(dict( + address=dict( + street_encrypted=schema.Binary, + city_encrypted=schema.Binary + ) + )) + + # Access deeply nested encrypted fields + profile.personal_info['address']['street'] = '123 Main St' + + :param data: The underlying dict data + :param encr_func: Function to encrypt data (str -> bytes) + :param decr_func: Function to decrypt data (bytes -> str) + :param field_schema: Dict mapping field names to their schemas (for nested dicts) """ def __init__(self, data: dict, encr_func, decr_func, field_schema: dict = None): From 2a73865ec1dc5a6c927e1dfdaf0a4bb1df380b75 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:54:32 +0000 Subject: [PATCH 5/5] Address code review feedback and improve error handling Co-authored-by: jcarlosdev <8920902+jcarlosdev@users.noreply.github.com> --- ming/encryption.py | 17 +++++++++-------- ming/metadata.py | 4 ++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/ming/encryption.py b/ming/encryption.py index c057d8a..bd6a8c4 100644 --- a/ming/encryption.py +++ b/ming/encryption.py @@ -195,7 +195,7 @@ def __init__(self, data: dict, encr_func, decr_func, field_schema: dict = None): def _wrap_nested_dicts(self): """Wrap nested dicts with EncryptedObject if they contain encrypted fields.""" - for key, value in list(self.items()): + for key, value in self.items(): if isinstance(value, dict) and not isinstance(value, EncryptedObject): # Check if this dict has any encrypted fields if self._has_encrypted_fields(value): @@ -424,12 +424,13 @@ def encrypt_some_fields(cls, data: dict) -> dict: encrypted_data[f'{fld}_encrypted'] = cls.encr(val) # Handle nested dicts - recursively encrypt fields in dict values - for key, value in list(encrypted_data.items()): - if isinstance(value, dict) and key in cls.m.field_index: - field = cls.m.field_index[key] - if hasattr(field.schema, 'fields'): - # This is an Object schema with defined fields - encrypted_data[key] = cls._encrypt_nested_dict(value, field.schema.fields) + if hasattr(cls, 'm') and hasattr(cls.m, 'field_index') and cls.m.field_index: + for key, value in encrypted_data.items(): + if isinstance(value, dict) and key in cls.m.field_index: + field = cls.m.field_index[key] + if hasattr(field, 'schema') and hasattr(field.schema, 'fields'): + # This is an Object schema with defined fields + encrypted_data[key] = cls._encrypt_nested_dict(value, field.schema.fields) return encrypted_data @@ -455,7 +456,7 @@ def _encrypt_nested_dict(cls, data: dict, schema_fields: dict) -> dict: encrypted_data[encrypted_field] = cls.encr(val) # Recursively handle nested dicts - for key, value in list(encrypted_data.items()): + for key, value in encrypted_data.items(): if isinstance(value, dict) and key in schema_fields: nested_schema = schema_fields[key] if hasattr(nested_schema, 'fields'): diff --git a/ming/metadata.py b/ming/metadata.py index cc4ccc3..6fe2dd4 100644 --- a/ming/metadata.py +++ b/ming/metadata.py @@ -393,9 +393,9 @@ def __get__(self, inst, cls=None): try: value = inst[self.name] # If the value is a dict with encrypted fields (direct or nested), wrap it with EncryptedObject - if isinstance(value, dict) and not isinstance(value, EncryptedMixin.EncryptedObject): + from .encryption import EncryptedObject + if isinstance(value, dict) and not isinstance(value, EncryptedObject): if self._has_encrypted_fields_recursive(value): - from .encryption import EncryptedObject # Get encryption functions from the document instance value = EncryptedObject(value, inst.encr, inst.decr) # Store the wrapped value back