From 21c1f7b55c7144e0058d0d56dd206e3eaa6f8ad5 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 31 Mar 2026 03:33:56 +0000 Subject: [PATCH] fix(discovery-provider): skip malformed em metadata during prefetch Co-authored-by: Ray Jacobson --- .../entity_manager/test_index_skip_tx.py | 78 +++++++++++++++ .../tasks/entity_manager/entity_manager.py | 95 +++++++++---------- 2 files changed, 124 insertions(+), 49 deletions(-) diff --git a/packages/discovery-provider/integration_tests/tasks/entity_manager/test_index_skip_tx.py b/packages/discovery-provider/integration_tests/tasks/entity_manager/test_index_skip_tx.py index beca615224b..8be1ca0adfc 100644 --- a/packages/discovery-provider/integration_tests/tasks/entity_manager/test_index_skip_tx.py +++ b/packages/discovery-provider/integration_tests/tasks/entity_manager/test_index_skip_tx.py @@ -80,3 +80,81 @@ def get_events_side_effect(_, tx_receipt): ).first() assert skipped_transactions.txhash == "0x43726561746555736572" + + +def test_skip_malformed_prefetch_metadata(app, mocker): + """ + Ensures malformed metadata shapes in prefetch do not abort block indexing. + """ + mocker.patch( + "src.tasks.entity_manager.entity_manager.create_user", + side_effect=Exception("Skip tx error"), + autospec=True, + ) + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt["transactionHash"].decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + + with app.app_context(): + db = get_db() + web3 = Web3() + redis = get_redis() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + update_task = UpdateTask(web3, challenge_event_bus, redis) + + tx_receipts = { + # json.loads succeeds, but metadata is not an object + "MalformedDeveloperApp": [ + { + "args": AttributeDict( + { + "_entityId": 1, + "_entityType": "DeveloperApp", + "_userId": 1, + "_action": "Create", + "_metadata": "[]", + "_signer": "user1wallet", + } + ) + }, + ], + # should still be processed after malformed metadata tx + "CreateUser": [ + { + "args": AttributeDict( + { + "_entityId": 1, + "_entityType": "User", + "_userId": 1, + "_action": "Create", + "_metadata": "", + "_signer": "user1wallet", + } + ) + }, + ], + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.to_bytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + populate_mock_db_blocks(db, 0, 1) + + with db.scoped_session() as session: + entity_manager_update( + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=hex(0), + ) + skipped_transactions = session.query(SkippedTransaction).all() + assert len(skipped_transactions) == 1 diff --git a/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py b/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py index 4ee15941b7d..b4745c604d5 100644 --- a/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py +++ b/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py @@ -628,6 +628,24 @@ def copy_original_records(existing_records): ) +def parse_metadata_json_dict(metadata: str, action: str, entity_type: str): + try: + json_metadata = json.loads(metadata) + except Exception as e: + logger.error( + f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" + ) + return None + + if not isinstance(json_metadata, dict): + logger.error( + f"tasks | entity_manager.py | Invalid {action} {entity_type} metadata shape, expected object" + ) + return None + + return json_metadata + + def collect_entities_to_fetch(update_task, entity_manager_txs): entities_to_fetch: Dict[EntityType, Set] = defaultdict(set) @@ -662,13 +680,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): entities_to_fetch[EntityType.USER].add(user_id) if action != Action.DELETE: - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue event_entity_type = json_metadata.get("data", {}).get("entity_type") @@ -684,13 +699,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): or action == Action.UNPIN or action == Action.REACT ): - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue track_id = json_metadata.get("data", {}).get("entity_id") entities_to_fetch[EntityType.TRACK].add(track_id) @@ -734,13 +746,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): entities_to_fetch[EntityType.DEVELOPER_APP].add(signer.lower()) entities_to_fetch[EntityType.USER_WALLET].add(signer.lower()) if entity_type == EntityType.DEVELOPER_APP: - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue raw_address = json_metadata.get("address", None) @@ -763,13 +772,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): "tasks | entity_manager.py | Missing address or valid app signature in metadata required for add developer app tx" ) if entity_type == EntityType.GRANT: - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue raw_grantee_address = json_metadata.get("grantee_address", None) @@ -799,13 +805,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): (signer.lower(), raw_grantor_user_id) ) if entity_type == EntityType.DASHBOARD_WALLET_USER: - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue raw_wallet = json_metadata.get("wallet", None) @@ -858,13 +861,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): entities_to_fetch[EntityType.USER].add(user_id) if entity_type == EntityType.ENCRYPTED_EMAIL: - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue # Add email owner's record to fetch @@ -875,13 +875,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): ) if entity_type == EntityType.EMAIL_ACCESS and action == Action.UPDATE: - try: - json_metadata = json.loads(metadata) - except Exception as e: - logger.error( - f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}" - ) - # skip invalid metadata + json_metadata = parse_metadata_json_dict( + metadata, str(action), str(entity_type) + ) + if not json_metadata: continue # Add email access record to fetch