Skip to content

Per imp#525

Open
gmechali wants to merge 41 commits into
datacommonsorg:masterfrom
gmechali:perImp
Open

Per imp#525
gmechali wants to merge 41 commits into
datacommonsorg:masterfrom
gmechali:perImp

Conversation

@gmechali
Copy link
Copy Markdown
Contributor

@gmechali gmechali commented Jun 5, 2026

No description provided.

@codacy-production
Copy link
Copy Markdown

codacy-production Bot commented Jun 5, 2026

Not up to standards ⛔

🔴 Issues 9 high · 15 medium · 76 minor

Alerts:
⚠ 100 issues (≤ 0 issues of at least minor severity)

Results:
100 new issues

Category Results
UnusedCode 2 medium
2 minor
BestPractice 2 medium
Documentation 16 minor
ErrorProne 9 high
CodeStyle 58 minor
Complexity 10 medium
Performance 1 medium

View in Codacy

🟢 Metrics 240 complexity · 14 duplication

Metric Results
Complexity 240
Duplication 14

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new isBaseDc configuration flag across the ingestion pipeline to support custom Data Commons setups, adds a Terraform deployment configuration, and implements Cloud Workflows and Cloud Run helper services to automate imports, Spanner ingestion, and BigQuery aggregations. It also introduces JsonLdStreamDb to stream JSON-LD shards directly to GCS or disk, alongside a new metadata validator. The review feedback highlights several critical security and reliability improvements, including replacing custom SQL escaping with parameterized queries to prevent SQL injection, restricting broad IAM roles in Terraform, adding timeouts to HTTP requests and polling loops, globally initializing Spanner and Storage clients to reduce latency, fixing a potential null-value bug in the JSON-LD stream database, and refining file-matching logic and test assertions.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread simple/stats/runner.py
Comment on lines +674 to +675
elif match(file, "*.mcf"):
mcf_files.append(file)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To handle sharded files, check for MCF files using contains('.mcf') (or ".mcf" in file.path in Python) on the file path, not endsWith('.mcf') or match(file, "*.mcf").

Suggested change
elif match(file, "*.mcf"):
mcf_files.append(file)
elif ".mcf" in file.path:
mcf_files.append(file)
References
  1. To handle sharded files, check for MCF files using contains('.mcf') on the file path, not endsWith('.mcf').

Comment on lines +25 to +39
def _escape_sql_literal(val: str) -> str:
r"""Escapes a string literal for use in nested BigQuery/Spanner queries.

This is required because the query string travels through two SQL parsers:
1. BigQuery parses the EXTERNAL_QUERY double-quoted string literal.
2. Spanner parses the resulting inner query's single-quoted string literal.

To ensure the value is correctly matched and prevent SQL injection:
- Backslashes (\) are escaped to 4 backslashes (\\\\) so they survive
both decodings (\\\\ -> \\ -> \). Otherwise, they may escape quotes
or be interpreted as control characters (like \b becoming backspace).
- Double quotes (") are escaped to \\" to prevent terminating BQ string.
- Single quotes (') are escaped to '' to prevent terminating Spanner string.
"""
return val.replace('\\', '\\\\\\\\').replace('"', '\\"').replace("'", "''")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Using custom SQL escaping functions like _escape_sql_literal to prevent SQL injection is highly discouraged and prone to bypasses. It is safer to use parameterized queries or structured query building instead of manual string concatenation when inserting variables into SQL queries, especially since import_names can originate from external Pub/Sub attributes.

args, kwargs = mock_snapshot.execute_sql.call_args
query = args[0]
self.assertIn("SELECT subject_id, name, types FROM Node", query)
self.assertIn("update_timestamp > @timestamp", query)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion update_timestamp > @timestamp is too loose because it matches last_update_timestamp > @timestamp as a substring. To ensure the query is strictly correct, assert the exact column name last_update_timestamp.

Suggested change
self.assertIn("update_timestamp > @timestamp", query)
self.assertIn("last_update_timestamp > @timestamp", query)

Comment on lines +144 to +146
response = requests.post(INGESTION_HELPER_URL,
json=request,
headers=headers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The requests.post call does not specify a timeout parameter. Without a timeout, the request can hang indefinitely if the remote server does not respond. Always specify a reasonable timeout (e.g., timeout=30).

Suggested change
response = requests.post(INGESTION_HELPER_URL,
json=request,
headers=headers)
response = requests.post(INGESTION_HELPER_URL,
json=request,
headers=headers,
timeout=30)

Comment on lines +77 to +84
spanner = SpannerClient(FLAGS.spanner_project_id,
FLAGS.spanner_instance_id,
FLAGS.spanner_database_id,
graph_database_id=FLAGS.spanner_graph_database_id,
location=FLAGS.location,
model_id=os.environ.get('EMBEDDING_MODEL_ID',
'text-embedding-005'))
storage = StorageClient(FLAGS.gcs_bucket_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Initializing heavy clients like SpannerClient and StorageClient inside the request handler on every invocation introduces significant latency and connection overhead. It is highly recommended to initialize these clients globally (outside the ingestion_helper function) to enable connection reuse across warm function instances.

Comment on lines +157 to +160
if row.object_id:
val = _uri_ref(row.object_id)
else:
val = _parse_numeric(row.object_value)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If val is None (e.g., if both object_id and object_value are empty), val_str will evaluate to "None", resulting in an invalid @type property. Ensure val is not None before processing.

    if row.object_id:
      val = _uri_ref(row.object_id)
    else:
      val = _parse_numeric(row.object_value)

    if val is None:
      continue

Comment on lines +55 to +73
while True:
execution = execution_client.get_execution(
request={"name": execution_name})
state = execution.state

if state != executions_v1.Execution.State.ACTIVE:
logging.info(f"Execution finished with state: {state}")
if state == executions_v1.Execution.State.SUCCEEDED:
logging.info(f"Workflow {workflow_id} succeeded.")
return execution.result
else:
logging.error(
f"Workflow {workflow_id} failed: {execution.error}")
raise RuntimeError(
f"Workflow {workflow_id} failed with state {state}")

time.sleep(backoff_delay)
backoff_delay = min(backoff_delay * 2,
60) # Exponential backoff up to 60s
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The polling loop for workflow execution has no maximum timeout or maximum retry limit. If a workflow gets stuck or runs indefinitely, this Python script will poll forever. Implement a maximum timeout or maximum number of attempts to prevent infinite hangs.

Comment on lines +316 to +333
resource "google_project_iam_member" "automation_roles" {
for_each = toset([
"roles/workflows.admin",
"roles/cloudfunctions.admin",
"roles/run.admin",
"roles/run.invoker",
"roles/batch.jobsEditor",
"roles/dataflow.admin",
"roles/logging.logWriter",
"roles/storage.objectAdmin",
"roles/iam.serviceAccountUser",
"roles/spanner.databaseAdmin",
"roles/bigquery.dataEditor",
"roles/bigquery.jobUser",
"roles/artifactregistry.admin",
"roles/secretmanager.secretAccessor",
"roles/cloudbuild.builds.builder",
])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Granting broad admin roles (such as roles/workflows.admin, roles/run.admin, roles/dataflow.admin, roles/spanner.databaseAdmin, roles/artifactregistry.admin) to the unified service account violates the principle of least privilege. Consider using more granular, restricted roles (e.g., roles/run.developer, roles/spanner.databaseUser, etc.) to limit the blast radius in case of a compromise.

vish-cs and others added 3 commits June 5, 2026 14:53
Solves two issues:
a) Nesting of substitution variables (_VERSION string within dataflow
template path) in the cloud build config
b) Handling of empty import list and job IDs to not cause ingestion
workflow to fail
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants