Merge CDA feed handler with ingestion helper#523
Conversation
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| Documentation | 15 minor |
| CodeStyle | 1 minor |
| Complexity | 2 medium |
🟢 Metrics 27 complexity · 2 duplication
Metric Results Complexity 27 Duplication 2
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request consolidates the separate import-helper service into the ingestion-helper service, removing the standalone service's deployment steps, Terraform resources, and source files, and routing GCS CDA Pub/Sub feed events directly to ingestion-helper. The review feedback identifies critical issues in the refactored feed_event_utils.py file, including a generic exception handler in the duplicate checking logic that could cause transient errors to silently skip imports, and a missing validation for import_name that could lead to runtime crashes.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| try: | ||
| blob.upload_from_string("", if_generation_match=0) | ||
| except Exception: | ||
| duplicate = True |
There was a problem hiding this comment.
Catching a generic Exception here means any transient error (such as a network timeout, rate limit, or temporary GCS outage) will be treated as a duplicate message. This causes the function to return OK, 200 (acknowledging the Pub/Sub message) and silently skip the import. Instead, you should specifically catch PreconditionFailed to identify duplicates, and let other exceptions propagate so Pub/Sub can retry the message.
| try: | |
| blob.upload_from_string("", if_generation_match=0) | |
| except Exception: | |
| duplicate = True | |
| try: | |
| blob.upload_from_string("", if_generation_match=0) | |
| except PreconditionFailed: | |
| duplicate = True |
| logging.info(f"Message {message_id} already processed. Skipping.") | ||
| return 'OK', 200 | ||
|
|
||
| import_name = attributes.get('import_name') |
There was a problem hiding this comment.
If import_name is missing or None, subsequent operations like import_name.replace(':', '/') (line 222) and import_name.split(':') (line 39 of invoke_spanner_ingestion_workflow) will raise an AttributeError and crash the function. We should validate that import_name is present and non-empty, and return an error or log and skip if it is missing.
| import_name = attributes.get('import_name') | |
| import_name = attributes.get('import_name') | |
| if not import_name: | |
| logging.error("Missing 'import_name' in message attributes.") | |
| return 'Missing import_name attribute', 400 |
| from google.cloud import storage | ||
| from google.cloud.workflows import executions_v1 | ||
| import requests | ||
| import import_utils |
There was a problem hiding this comment.
Import PreconditionFailed from google.api_core.exceptions to allow catching only the specific exception raised when a duplicate message is detected.
| from google.cloud import storage | |
| from google.cloud.workflows import executions_v1 | |
| import requests | |
| import import_utils | |
| from google.api_core.exceptions import PreconditionFailed | |
| from google.cloud import storage | |
| from google.cloud.workflows import executions_v1 | |
| import import_utils |
No description provided.