Skip to content

[DCP - Ingestion] DCP Bridge Mode improvements#519

Open
gmechali wants to merge 26 commits into
datacommonsorg:masterfrom
gmechali:newv
Open

[DCP - Ingestion] DCP Bridge Mode improvements#519
gmechali wants to merge 26 commits into
datacommonsorg:masterfrom
gmechali:newv

Conversation

@gmechali
Copy link
Copy Markdown
Contributor

@gmechali gmechali commented Jun 4, 2026

Refactor Simple Importer Parallel Ingestion, GCS Exporter & Decouple JSON-LD Streaming

Summary

This PR optimizes the parallel ingestion pipeline, decouples JSON-LD streaming, and resolves critical connection pooling and CPU-blocking bottlenecks to speed up the GCS bulk upload phase.

Key Changes

  • GCS Upload Speedup: Replaced PyFilesystem GCSFS client (serialized by instance locks) with the native google-cloud-storage client. Configured the internal HTTP session pool size to match upload thread concurrency (32).
  • Reduces GCS upload time by more than half
  • Decoupled JSON-LD Exporter: Extracted JsonLdStreamDb into its own module (stats/jsonld_stream_db.py) to eliminate circular import dependencies between relational DB logic and GCS streaming exporters.
  • CPU Bottleneck Fixes: Moved Pandas to_records() conversions out of the sequential generator into worker threads, and removed redundant blocking gc.collect() calls from chunk generators.
  • Orchestration: Deferred workflow auto-trigger execution to the very end of Runner.run() to guarantee that Cloud Workflows/Dataflow starts only after all JSON-LD files are fully uploaded to GCS.

Code Health & Thread Safety:

Wrapped child progress reporter updates with the parent reentrant lock.
Fixed a critical logging bug where failed files incorrectly reported Status
Moved all inline imports (threading, requests, tempfile) to top-level.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces JsonLdStreamDb to stream triples and observations directly to JSON-LD shards on GCS or local disk, bypassing SQLite. It also adds thread safety to the Nodes and ImportReporter classes and enables parallel ingestion of data files in DCP_BRIDGE mode. The review feedback highlights several critical issues in simple/stats/db.py, including potential AttributeError crashes in _uri_ref when handling non-string values, O(N^2) performance bottlenecks when slicing lists in chunk generators, and type overwriting when a subject has multiple typeOf predicates.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread simple/stats/db.py Outdated
Comment thread simple/stats/db.py Outdated
Comment thread simple/stats/db.py Outdated
Comment thread simple/stats/db.py Outdated
Comment thread simple/stats/db.py Outdated
@gmechali gmechali requested a review from dwnoble June 4, 2026 21:56
@gmechali
Copy link
Copy Markdown
Contributor Author

gmechali commented Jun 4, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces JsonLdStreamDb to stream JSON-LD shards directly to GCS or local disk, optimizing data ingestion in DCP_BRIDGE mode. It also adds parallel file ingestion, thread-safety decorators, and a bug fix for reporting failures. The review feedback highlights three key improvement opportunities: deduplicating non-@type predicate values in the fast node sharding path to ensure parity with rdflib, catching specific json.JSONDecodeError exceptions instead of a broad Exception when parsing custom properties, and removing redundant, risky private API access on the GCS client.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread simple/stats/jsonld_stream_db.py
Comment thread simple/stats/jsonld_stream_db.py Outdated
Comment thread simple/stats/jsonld_stream_db.py
@codacy-production
Copy link
Copy Markdown

codacy-production Bot commented Jun 4, 2026

Not up to standards ⛔

🔴 Issues 3 high · 5 medium · 38 minor

Alerts:
⚠ 46 issues (≤ 0 issues of at least minor severity)

Results:
46 new issues

Category Results
UnusedCode 2 medium
1 minor
Documentation 8 minor
ErrorProne 3 high
CodeStyle 29 minor
Complexity 3 medium

View in Codacy

🟢 Metrics 107 complexity · 8 duplication

Metric Results
Complexity 107
Duplication 8

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

if not observations_df.empty:
records = observations_df.to_records(index=False).tolist()
with self.lock:
self._obs_records.extend(records)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dataframe looks like it could get pretty big and use a lot of memory. can it write to disk as it goes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants