Per imp#525
Conversation
Code under workflow and Terraform folders is copied as is from the data repo as part of the migration. Any changes are part of a separate commit datacommonsorg@6f50438
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| UnusedCode | 2 medium 2 minor |
| BestPractice | 2 medium |
| Documentation | 16 minor |
| ErrorProne | 9 high |
| CodeStyle | 58 minor |
| Complexity | 10 medium |
| Performance | 1 medium |
🟢 Metrics 240 complexity · 14 duplication
Metric Results Complexity 240 Duplication 14
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request introduces a new isBaseDc configuration flag across the ingestion pipeline to support custom Data Commons setups, adds a Terraform deployment configuration, and implements Cloud Workflows and Cloud Run helper services to automate imports, Spanner ingestion, and BigQuery aggregations. It also introduces JsonLdStreamDb to stream JSON-LD shards directly to GCS or disk, alongside a new metadata validator. The review feedback highlights several critical security and reliability improvements, including replacing custom SQL escaping with parameterized queries to prevent SQL injection, restricting broad IAM roles in Terraform, adding timeouts to HTTP requests and polling loops, globally initializing Spanner and Storage clients to reduce latency, fixing a potential null-value bug in the JSON-LD stream database, and refining file-matching logic and test assertions.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| elif match(file, "*.mcf"): | ||
| mcf_files.append(file) |
There was a problem hiding this comment.
To handle sharded files, check for MCF files using contains('.mcf') (or ".mcf" in file.path in Python) on the file path, not endsWith('.mcf') or match(file, "*.mcf").
| elif match(file, "*.mcf"): | |
| mcf_files.append(file) | |
| elif ".mcf" in file.path: | |
| mcf_files.append(file) |
References
- To handle sharded files, check for MCF files using
contains('.mcf')on the file path, notendsWith('.mcf').
| def _escape_sql_literal(val: str) -> str: | ||
| r"""Escapes a string literal for use in nested BigQuery/Spanner queries. | ||
|
|
||
| This is required because the query string travels through two SQL parsers: | ||
| 1. BigQuery parses the EXTERNAL_QUERY double-quoted string literal. | ||
| 2. Spanner parses the resulting inner query's single-quoted string literal. | ||
|
|
||
| To ensure the value is correctly matched and prevent SQL injection: | ||
| - Backslashes (\) are escaped to 4 backslashes (\\\\) so they survive | ||
| both decodings (\\\\ -> \\ -> \). Otherwise, they may escape quotes | ||
| or be interpreted as control characters (like \b becoming backspace). | ||
| - Double quotes (") are escaped to \\" to prevent terminating BQ string. | ||
| - Single quotes (') are escaped to '' to prevent terminating Spanner string. | ||
| """ | ||
| return val.replace('\\', '\\\\\\\\').replace('"', '\\"').replace("'", "''") |
There was a problem hiding this comment.
Using custom SQL escaping functions like _escape_sql_literal to prevent SQL injection is highly discouraged and prone to bypasses. It is safer to use parameterized queries or structured query building instead of manual string concatenation when inserting variables into SQL queries, especially since import_names can originate from external Pub/Sub attributes.
| args, kwargs = mock_snapshot.execute_sql.call_args | ||
| query = args[0] | ||
| self.assertIn("SELECT subject_id, name, types FROM Node", query) | ||
| self.assertIn("update_timestamp > @timestamp", query) |
There was a problem hiding this comment.
The assertion update_timestamp > @timestamp is too loose because it matches last_update_timestamp > @timestamp as a substring. To ensure the query is strictly correct, assert the exact column name last_update_timestamp.
| self.assertIn("update_timestamp > @timestamp", query) | |
| self.assertIn("last_update_timestamp > @timestamp", query) |
| response = requests.post(INGESTION_HELPER_URL, | ||
| json=request, | ||
| headers=headers) |
There was a problem hiding this comment.
The requests.post call does not specify a timeout parameter. Without a timeout, the request can hang indefinitely if the remote server does not respond. Always specify a reasonable timeout (e.g., timeout=30).
| response = requests.post(INGESTION_HELPER_URL, | |
| json=request, | |
| headers=headers) | |
| response = requests.post(INGESTION_HELPER_URL, | |
| json=request, | |
| headers=headers, | |
| timeout=30) |
| spanner = SpannerClient(FLAGS.spanner_project_id, | ||
| FLAGS.spanner_instance_id, | ||
| FLAGS.spanner_database_id, | ||
| graph_database_id=FLAGS.spanner_graph_database_id, | ||
| location=FLAGS.location, | ||
| model_id=os.environ.get('EMBEDDING_MODEL_ID', | ||
| 'text-embedding-005')) | ||
| storage = StorageClient(FLAGS.gcs_bucket_id) |
There was a problem hiding this comment.
Initializing heavy clients like SpannerClient and StorageClient inside the request handler on every invocation introduces significant latency and connection overhead. It is highly recommended to initialize these clients globally (outside the ingestion_helper function) to enable connection reuse across warm function instances.
| if row.object_id: | ||
| val = _uri_ref(row.object_id) | ||
| else: | ||
| val = _parse_numeric(row.object_value) |
There was a problem hiding this comment.
If val is None (e.g., if both object_id and object_value are empty), val_str will evaluate to "None", resulting in an invalid @type property. Ensure val is not None before processing.
if row.object_id:
val = _uri_ref(row.object_id)
else:
val = _parse_numeric(row.object_value)
if val is None:
continue| while True: | ||
| execution = execution_client.get_execution( | ||
| request={"name": execution_name}) | ||
| state = execution.state | ||
|
|
||
| if state != executions_v1.Execution.State.ACTIVE: | ||
| logging.info(f"Execution finished with state: {state}") | ||
| if state == executions_v1.Execution.State.SUCCEEDED: | ||
| logging.info(f"Workflow {workflow_id} succeeded.") | ||
| return execution.result | ||
| else: | ||
| logging.error( | ||
| f"Workflow {workflow_id} failed: {execution.error}") | ||
| raise RuntimeError( | ||
| f"Workflow {workflow_id} failed with state {state}") | ||
|
|
||
| time.sleep(backoff_delay) | ||
| backoff_delay = min(backoff_delay * 2, | ||
| 60) # Exponential backoff up to 60s |
| resource "google_project_iam_member" "automation_roles" { | ||
| for_each = toset([ | ||
| "roles/workflows.admin", | ||
| "roles/cloudfunctions.admin", | ||
| "roles/run.admin", | ||
| "roles/run.invoker", | ||
| "roles/batch.jobsEditor", | ||
| "roles/dataflow.admin", | ||
| "roles/logging.logWriter", | ||
| "roles/storage.objectAdmin", | ||
| "roles/iam.serviceAccountUser", | ||
| "roles/spanner.databaseAdmin", | ||
| "roles/bigquery.dataEditor", | ||
| "roles/bigquery.jobUser", | ||
| "roles/artifactregistry.admin", | ||
| "roles/secretmanager.secretAccessor", | ||
| "roles/cloudbuild.builds.builder", | ||
| ]) |
There was a problem hiding this comment.
Granting broad admin roles (such as roles/workflows.admin, roles/run.admin, roles/dataflow.admin, roles/spanner.databaseAdmin, roles/artifactregistry.admin) to the unified service account violates the principle of least privilege. Consider using more granular, restricted roles (e.g., roles/run.developer, roles/spanner.databaseUser, etc.) to limit the blast radius in case of a compromise.
Solves two issues: a) Nesting of substitution variables (_VERSION string within dataflow template path) in the cloud build config b) Handling of empty import list and job IDs to not cause ingestion workflow to fail
No description provided.