Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,81 @@ def get_events_side_effect(_, tx_receipt):
).first()

assert skipped_transactions.txhash == "0x43726561746555736572"


def test_skip_malformed_prefetch_metadata(app, mocker):
"""
Ensures malformed metadata shapes in prefetch do not abort block indexing.
"""
mocker.patch(
"src.tasks.entity_manager.entity_manager.create_user",
side_effect=Exception("Skip tx error"),
autospec=True,
)

def get_events_side_effect(_, tx_receipt):
return tx_receipts[tx_receipt["transactionHash"].decode("utf-8")]

mocker.patch(
"src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx",
side_effect=get_events_side_effect,
autospec=True,
)

with app.app_context():
db = get_db()
web3 = Web3()
redis = get_redis()
challenge_event_bus: ChallengeEventBus = setup_challenge_bus()
update_task = UpdateTask(web3, challenge_event_bus, redis)

tx_receipts = {
# json.loads succeeds, but metadata is not an object
"MalformedDeveloperApp": [
{
"args": AttributeDict(
{
"_entityId": 1,
"_entityType": "DeveloperApp",
"_userId": 1,
"_action": "Create",
"_metadata": "[]",
"_signer": "user1wallet",
}
)
},
],
# should still be processed after malformed metadata tx
"CreateUser": [
{
"args": AttributeDict(
{
"_entityId": 1,
"_entityType": "User",
"_userId": 1,
"_action": "Create",
"_metadata": "",
"_signer": "user1wallet",
}
)
},
],
}

entity_manager_txs = [
AttributeDict({"transactionHash": update_task.web3.to_bytes(text=tx_receipt)})
for tx_receipt in tx_receipts
]
populate_mock_db_blocks(db, 0, 1)

with db.scoped_session() as session:
entity_manager_update(
update_task,
session,
entity_manager_txs,
block_number=0,
block_timestamp=1585336422,
block_hash=hex(0),
)
skipped_transactions = session.query(SkippedTransaction).all()
assert len(skipped_transactions) == 1
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,24 @@ def copy_original_records(existing_records):
)


def parse_metadata_json_dict(metadata: str, action: str, entity_type: str):
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
return None

if not isinstance(json_metadata, dict):
logger.error(
f"tasks | entity_manager.py | Invalid {action} {entity_type} metadata shape, expected object"
)
return None

return json_metadata


def collect_entities_to_fetch(update_task, entity_manager_txs):
entities_to_fetch: Dict[EntityType, Set] = defaultdict(set)

Expand Down Expand Up @@ -662,13 +680,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
entities_to_fetch[EntityType.USER].add(user_id)

if action != Action.DELETE:
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue

event_entity_type = json_metadata.get("data", {}).get("entity_type")
Expand All @@ -684,13 +699,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
or action == Action.UNPIN
or action == Action.REACT
):
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue
track_id = json_metadata.get("data", {}).get("entity_id")
entities_to_fetch[EntityType.TRACK].add(track_id)
Expand Down Expand Up @@ -734,13 +746,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
entities_to_fetch[EntityType.DEVELOPER_APP].add(signer.lower())
entities_to_fetch[EntityType.USER_WALLET].add(signer.lower())
if entity_type == EntityType.DEVELOPER_APP:
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue

raw_address = json_metadata.get("address", None)
Expand All @@ -763,13 +772,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
"tasks | entity_manager.py | Missing address or valid app signature in metadata required for add developer app tx"
)
if entity_type == EntityType.GRANT:
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue

raw_grantee_address = json_metadata.get("grantee_address", None)
Expand Down Expand Up @@ -799,13 +805,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
(signer.lower(), raw_grantor_user_id)
)
if entity_type == EntityType.DASHBOARD_WALLET_USER:
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue

raw_wallet = json_metadata.get("wallet", None)
Expand Down Expand Up @@ -858,13 +861,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
entities_to_fetch[EntityType.USER].add(user_id)

if entity_type == EntityType.ENCRYPTED_EMAIL:
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue

# Add email owner's record to fetch
Expand All @@ -875,13 +875,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs):
)

if entity_type == EntityType.EMAIL_ACCESS and action == Action.UPDATE:
try:
json_metadata = json.loads(metadata)
except Exception as e:
logger.error(
f"tasks | entity_manager.py | Exception deserializing {action} {entity_type} event metadata: {e}"
)
# skip invalid metadata
json_metadata = parse_metadata_json_dict(
metadata, str(action), str(entity_type)
)
if not json_metadata:
continue

# Add email access record to fetch
Expand Down