From 78719893dae410216d798437a3e6380c976eafa2 Mon Sep 17 00:00:00 2001 From: Daksh Tulsyan Date: Thu, 4 Jun 2026 21:04:09 +0000 Subject: [PATCH 1/4] Add script to extract provenance, dataset, and source metadata for Croissant --- util/extract_source_dataset_provenance.py | 223 ++++++++++++ .../extract_source_dataset_provenance_test.py | 317 ++++++++++++++++++ 2 files changed, 540 insertions(+) create mode 100644 util/extract_source_dataset_provenance.py create mode 100644 util/extract_source_dataset_provenance_test.py diff --git a/util/extract_source_dataset_provenance.py b/util/extract_source_dataset_provenance.py new file mode 100644 index 0000000000..eb8b0f9feb --- /dev/null +++ b/util/extract_source_dataset_provenance.py @@ -0,0 +1,223 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utility script to extract the Data Commons Provenance hierarchy. + +This script fetches all Provenance nodes from the Data Commons Knowledge Graph, +resolves their associated Dataset and Source nodes, and outputs the hierarchy +as a structured JSON file. +""" + +import json +import os +import sys + +from absl import app +from absl import flags +from absl import logging + +# Ensure local imports work correctly if run from repo root +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from util.dc_api_wrapper import dc_api_wrapper, get_datacommons_client + +FLAGS = flags.FLAGS + +flags.DEFINE_string( + 'output_file', 'provenances_full.json', + 'Path to the output JSON file where the hierarchy will be saved.') +flags.DEFINE_string( + 'api_key', None, + 'Data Commons API key. If not provided, the DC_API_KEY environment variable is used.' +) + + +def get_node_property(node_data, prop_name, default=None): + """Helper to extract a property value from the node data dictionary.""" + arcs = node_data.get("arcs", {}) + prop_nodes = arcs.get(prop_name, {}).get("nodes", []) + if not prop_nodes: + return default + return prop_nodes[0].get("value") + + +def get_node_dcid(node_data, prop_name): + """Helper to extract a DCID from a property.""" + arcs = node_data.get("arcs", {}) + prop_nodes = arcs.get(prop_name, {}).get("nodes", []) + if not prop_nodes: + return None + return prop_nodes[0].get("dcid") + + +def fetch_all_provenances(api_key: str, output_file: str) -> None: + """Fetches Provenance nodes and traverses to Dataset and Source levels.""" + client = get_datacommons_client({'dc_api_key': api_key}) + + # 1. Get all nodes of type 'Provenance' + logging.info("Fetching Provenance DCIDs...") + try: + res = dc_api_wrapper(function=client.node.fetch, + args={ + 'node_dcids': ['Provenance'], + 'expression': '<-typeOf' + }, + use_cache=True) + if not res: + logging.error("No response from Data Commons API.") + return + + res_dict = res.to_dict() + provenance_nodes = res_dict.get("data", {}).get("Provenance", {}).get( + "arcs", {}).get("typeOf", {}).get("nodes", []) + provenance_dcids = [ + dcid for node in provenance_nodes if (dcid := node.get("dcid")) + ] + except Exception as e: + logging.error(f"Error fetching provenances: {e}") + return + + if not provenance_dcids: + logging.warning("No provenances found.") + return + + logging.info(f"Found {len(provenance_dcids)} provenances.") + + # 2. Fetch Provenance details and find Dataset DCIDs + logging.info("Fetching Provenance details...") + provenance_data_map = {} + dataset_dcids = set() + + batch_size = 50 + for i in range(0, len(provenance_dcids), batch_size): + batch = provenance_dcids[i:i + batch_size] + batch_res = dc_api_wrapper(function=client.node.fetch, + args={ + 'node_dcids': batch, + 'expression': '->*' + }, + use_cache=True) + if not batch_res: + continue + data = batch_res.to_dict().get("data", {}) + + for dcid in batch: + node_data = data.get(dcid, {}) + prov_entry = { + "dcid": dcid, + "name": get_node_property(node_data, "name"), + "description": get_node_property(node_data, "description"), + "sourceDataUrl": get_node_property(node_data, "sourceDataUrl"), + "license": get_node_property(node_data, "license"), + "dataset_dcid": get_node_dcid(node_data, "isPartOf") + } + provenance_data_map[dcid] = prov_entry + if prov_entry["dataset_dcid"]: + dataset_dcids.add(prov_entry["dataset_dcid"]) + + # 3. Fetch Dataset details and find Source DCIDs + logging.info(f"Fetching {len(dataset_dcids)} Dataset details...") + dataset_data_map = {} + source_dcids = set() + + dataset_list = list(dataset_dcids) + for i in range(0, len(dataset_list), batch_size): + batch = dataset_list[i:i + batch_size] + batch_res = dc_api_wrapper(function=client.node.fetch, + args={ + 'node_dcids': batch, + 'expression': '->*' + }, + use_cache=True) + if not batch_res: + continue + data = batch_res.to_dict().get("data", {}) + + for dcid in batch: + node_data = data.get(dcid, {}) + ds_entry = { + "name": get_node_property(node_data, "name"), + "url": get_node_property(node_data, "url"), + "source_dcid": get_node_dcid(node_data, "isPartOf") + } + dataset_data_map[dcid] = ds_entry + if ds_entry["source_dcid"]: + source_dcids.add(ds_entry["source_dcid"]) + + # 4. Fetch Source details + logging.info(f"Fetching {len(source_dcids)} Source details...") + source_data_map = {} + source_list = list(source_dcids) + for i in range(0, len(source_list), batch_size): + batch = source_list[i:i + batch_size] + batch_res = dc_api_wrapper(function=client.node.fetch, + args={ + 'node_dcids': batch, + 'expression': '->*' + }, + use_cache=True) + if not batch_res: + continue + data = batch_res.to_dict().get("data", {}) + + for dcid in batch: + node_data = data.get(dcid, {}) + source_data_map[dcid] = { + "name": get_node_property(node_data, "name"), + "url": get_node_property(node_data, "url") + } + + # 5. Assemble final hierarchy + logging.info("Assembling final hierarchy...") + final_output = [] + for prov_dcid, prov in provenance_data_map.items(): + ds_dcid = prov.pop("dataset_dcid") + dataset_info = None + + if ds_dcid: + ds_data = dataset_data_map.get(ds_dcid, {}) + src_dcid = ds_data.get("source_dcid") + source_info = None + + if src_dcid: + source_info = source_data_map.get(src_dcid) + + dataset_info = { + "name": ds_data.get("name"), + "url": ds_data.get("url"), + "source": source_info + } + + prov["dataset"] = dataset_info + final_output.append(prov) + + with open(output_file, "w") as f: + json.dump(final_output, f, indent=2) + + logging.info( + f"Successfully wrote {len(final_output)} provenances to {output_file}") + + +def main(_): + api_key = FLAGS.api_key or os.environ.get('DC_API_KEY') + if not api_key: + logging.fatal( + "DC_API_KEY environment variable not set and --api_key flag not provided." + ) + sys.exit(1) + + fetch_all_provenances(api_key=api_key, output_file=FLAGS.output_file) + + +if __name__ == '__main__': + app.run(main) diff --git a/util/extract_source_dataset_provenance_test.py b/util/extract_source_dataset_provenance_test.py new file mode 100644 index 0000000000..064816c803 --- /dev/null +++ b/util/extract_source_dataset_provenance_test.py @@ -0,0 +1,317 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for extract_source_dataset_provenance.py.""" + +import json +import os +import sys +import tempfile +import unittest +from unittest import mock + +# Python path is managed by running from the data directory. + +from util import extract_source_dataset_provenance + + +class TestExtractSourceDatasetProvenance(unittest.TestCase): + + def setUp(self): + super().setUp() + # Create a temporary directory for output files + self.test_dir = tempfile.TemporaryDirectory() + self.output_file = os.path.join(self.test_dir.name, 'output.json') + + def tearDown(self): + self.test_dir.cleanup() + super().tearDown() + + def test_get_node_property(self): + """Tests extracting string values from node property structures.""" + # Value exists + node_data_with_val = { + 'arcs': { + 'name': { + 'nodes': [{ + 'value': 'Test Name' + }] + } + } + } + self.assertEqual( + extract_source_dataset_provenance.get_node_property( + node_data_with_val, 'name'), 'Test Name') + + # Empty list of nodes + node_data_empty = {'arcs': {'name': {'nodes': []}}} + self.assertEqual( + extract_source_dataset_provenance.get_node_property( + node_data_empty, 'name', default='Fallback'), 'Fallback') + + # Missing arcs + self.assertEqual( + extract_source_dataset_provenance.get_node_property( + {}, 'name', default='Fallback'), 'Fallback') + + def test_get_node_dcid(self): + """Tests extracting DCID values from node property structures.""" + # DCID exists + node_data_with_dcid = { + 'arcs': { + 'isPartOf': { + 'nodes': [{ + 'dcid': 'dc/ds/ds1' + }] + } + } + } + self.assertEqual( + extract_source_dataset_provenance.get_node_dcid( + node_data_with_dcid, 'isPartOf'), 'dc/ds/ds1') + + # Empty nodes list + node_data_empty = {'arcs': {'isPartOf': {'nodes': []}}} + self.assertIsNone( + extract_source_dataset_provenance.get_node_dcid( + node_data_empty, 'isPartOf')) + + # Missing property + self.assertIsNone( + extract_source_dataset_provenance.get_node_dcid({}, 'isPartOf')) + + @mock.patch('util.extract_source_dataset_provenance.dc_api_wrapper') + @mock.patch('util.extract_source_dataset_provenance.get_datacommons_client') + def test_fetch_all_provenances_success(self, mock_get_client, mock_wrapper): + """Tests successful fetches and hierarchy resolver end-to-end.""" + # Standard configuration for API mocks + mock_client = mock.Mock() + mock_get_client.return_value = mock_client + + # Define mock return objects + res_type_of = mock.Mock() + res_type_of.to_dict.return_value = { + 'data': { + 'Provenance': { + 'arcs': { + 'typeOf': { + 'nodes': [ + { + 'dcid': 'dc/prov/prov1' + }, + { + 'dcid': 'dc/prov/prov2' + }, + { + 'dcid': '' + }, # testing filtering out empty dcids + ] + } + } + } + } + } + + res_prov_details = mock.Mock() + res_prov_details.to_dict.return_value = { + 'data': { + 'dc/prov/prov1': { + 'arcs': { + 'name': { + 'nodes': [{ + 'value': 'Provenance One' + }] + }, + 'description': { + 'nodes': [{ + 'value': 'Desc One' + }] + }, + 'sourceDataUrl': { + 'nodes': [{ + 'value': 'http://url1' + }] + }, + 'license': { + 'nodes': [{ + 'value': 'CC-BY' + }] + }, + 'isPartOf': { + 'nodes': [{ + 'dcid': 'dc/ds/ds1' + }] + }, + } + }, + 'dc/prov/prov2': { + 'arcs': { + 'name': { + 'nodes': [{ + 'value': 'Provenance Two' + }] + }, + 'license': { + 'nodes': [{ + 'value': 'Public Domain' + }] + }, + 'isPartOf': { + 'nodes': [] + }, # lacks dataset link + } + }, + } + } + + res_ds_details = mock.Mock() + res_ds_details.to_dict.return_value = { + 'data': { + 'dc/ds/ds1': { + 'arcs': { + 'name': { + 'nodes': [{ + 'value': 'Dataset One' + }] + }, + 'url': { + 'nodes': [{ + 'value': 'http://dataset1' + }] + }, + 'isPartOf': { + 'nodes': [{ + 'dcid': 'dc/src/src1' + }] + }, + } + } + } + } + + res_src_details = mock.Mock() + res_src_details.to_dict.return_value = { + 'data': { + 'dc/src/src1': { + 'arcs': { + 'name': { + 'nodes': [{ + 'value': 'Source One' + }] + }, + 'url': { + 'nodes': [{ + 'value': 'http://source1' + }] + }, + } + } + } + } + + # API calls matching order of execution in script: + # 1. Fetch Provenances by typeOf + # 2. Fetch details for batch of provenances + # 3. Fetch details for batch of datasets + # 4. Fetch details for batch of sources + mock_wrapper.side_effect = [ + res_type_of, + res_prov_details, + res_ds_details, + res_src_details, + ] + + extract_source_dataset_provenance.fetch_all_provenances( + api_key='dummy_key', output_file=self.output_file) + + # 1. Verify client was created + mock_get_client.assert_called_once_with({'dc_api_key': 'dummy_key'}) + + # 2. Verify Output file generated correctly + self.assertTrue(os.path.exists(self.output_file)) + with open(self.output_file, 'r') as f: + output_data = json.load(f) + + # 3. Validate resulting structure matching expected hierarchy + expected_output = [ + { + 'dcid': 'dc/prov/prov1', + 'name': 'Provenance One', + 'description': 'Desc One', + 'sourceDataUrl': 'http://url1', + 'license': 'CC-BY', + 'dataset': { + 'name': 'Dataset One', + 'url': 'http://dataset1', + 'source': { + 'name': 'Source One', + 'url': 'http://source1' + }, + }, + }, + { + 'dcid': 'dc/prov/prov2', + 'name': 'Provenance Two', + 'description': None, + 'sourceDataUrl': None, + 'license': 'Public Domain', + 'dataset': None, + }, + ] + self.assertEqual(output_data, expected_output) + + @mock.patch('util.extract_source_dataset_provenance.dc_api_wrapper') + @mock.patch('util.extract_source_dataset_provenance.get_datacommons_client') + def test_fetch_all_provenances_empty(self, mock_get_client, mock_wrapper): + """Tests execution flow when no provenances are returned.""" + mock_client = mock.Mock() + mock_get_client.return_value = mock_client + + res_type_of = mock.Mock() + res_type_of.to_dict.return_value = { + 'data': { + 'Provenance': { + 'arcs': { + 'typeOf': { + 'nodes': [] + } + } + } + } + } + mock_wrapper.return_value = res_type_of + + extract_source_dataset_provenance.fetch_all_provenances( + api_key='dummy_key', output_file=self.output_file) + + # Output file should not have been written to + self.assertFalse(os.path.exists(self.output_file)) + + @mock.patch('util.extract_source_dataset_provenance.dc_api_wrapper') + @mock.patch('util.extract_source_dataset_provenance.get_datacommons_client') + def test_fetch_all_provenances_api_failure(self, mock_get_client, + mock_wrapper): + """Tests correct handling of exceptions thrown by the API wrapper.""" + mock_client = mock.Mock() + mock_get_client.return_value = mock_client + + mock_wrapper.side_effect = RuntimeError('API failure') + + # Should not raise exception out of function, but log and return gracefully + extract_source_dataset_provenance.fetch_all_provenances( + api_key='dumm_key', output_file=self.output_file) + self.assertFalse(os.path.exists(self.output_file)) + + +if __name__ == '__main__': + unittest.main() From 19ae732e581ec65130280afc56e52e1b46fa02a6 Mon Sep 17 00:00:00 2001 From: Daksh Tulsyan Date: Fri, 5 Jun 2026 11:02:00 +0000 Subject: [PATCH 2/4] Apply Gemini code review suggestions --- util/extract_source_dataset_provenance.py | 31 +++++++++++-------- .../extract_source_dataset_provenance_test.py | 9 ++++++ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/util/extract_source_dataset_provenance.py b/util/extract_source_dataset_provenance.py index eb8b0f9feb..fbd55fdf5a 100644 --- a/util/extract_source_dataset_provenance.py +++ b/util/extract_source_dataset_provenance.py @@ -44,6 +44,8 @@ def get_node_property(node_data, prop_name, default=None): """Helper to extract a property value from the node data dictionary.""" + if not node_data: + return default arcs = node_data.get("arcs", {}) prop_nodes = arcs.get(prop_name, {}).get("nodes", []) if not prop_nodes: @@ -53,6 +55,8 @@ def get_node_property(node_data, prop_name, default=None): def get_node_dcid(node_data, prop_name): """Helper to extract a DCID from a property.""" + if not node_data: + return None arcs = node_data.get("arcs", {}) prop_nodes = arcs.get(prop_name, {}).get("nodes", []) if not prop_nodes: @@ -185,23 +189,24 @@ def fetch_all_provenances(api_key: str, output_file: str) -> None: dataset_info = None if ds_dcid: - ds_data = dataset_data_map.get(ds_dcid, {}) - src_dcid = ds_data.get("source_dcid") - source_info = None - - if src_dcid: - source_info = source_data_map.get(src_dcid) - - dataset_info = { - "name": ds_data.get("name"), - "url": ds_data.get("url"), - "source": source_info - } + ds_data = dataset_data_map.get(ds_dcid) + if ds_data: + src_dcid = ds_data.get("source_dcid") + source_info = None + + if src_dcid: + source_info = source_data_map.get(src_dcid) + + dataset_info = { + "name": ds_data.get("name"), + "url": ds_data.get("url"), + "source": source_info + } prov["dataset"] = dataset_info final_output.append(prov) - with open(output_file, "w") as f: + with open(output_file, "w", encoding="utf-8") as f: json.dump(final_output, f, indent=2) logging.info( diff --git a/util/extract_source_dataset_provenance_test.py b/util/extract_source_dataset_provenance_test.py index 064816c803..29c29bad14 100644 --- a/util/extract_source_dataset_provenance_test.py +++ b/util/extract_source_dataset_provenance_test.py @@ -64,6 +64,11 @@ def test_get_node_property(self): extract_source_dataset_provenance.get_node_property( {}, 'name', default='Fallback'), 'Fallback') + # None data + self.assertEqual( + extract_source_dataset_provenance.get_node_property( + None, 'name', default='Fallback'), 'Fallback') + def test_get_node_dcid(self): """Tests extracting DCID values from node property structures.""" # DCID exists @@ -90,6 +95,10 @@ def test_get_node_dcid(self): self.assertIsNone( extract_source_dataset_provenance.get_node_dcid({}, 'isPartOf')) + # None data + self.assertIsNone( + extract_source_dataset_provenance.get_node_dcid(None, 'isPartOf')) + @mock.patch('util.extract_source_dataset_provenance.dc_api_wrapper') @mock.patch('util.extract_source_dataset_provenance.get_datacommons_client') def test_fetch_all_provenances_success(self, mock_get_client, mock_wrapper): From f1f4900909ae7eab6894041b66250e409f65cafc Mon Sep 17 00:00:00 2001 From: Daksh Tulsyan Date: Mon, 8 Jun 2026 07:28:19 +0000 Subject: [PATCH 3/4] Address review comments: Remove --api_key flag and increase batch size --- util/extract_source_dataset_provenance.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/util/extract_source_dataset_provenance.py b/util/extract_source_dataset_provenance.py index fbd55fdf5a..3043abadd1 100644 --- a/util/extract_source_dataset_provenance.py +++ b/util/extract_source_dataset_provenance.py @@ -16,6 +16,8 @@ This script fetches all Provenance nodes from the Data Commons Knowledge Graph, resolves their associated Dataset and Source nodes, and outputs the hierarchy as a structured JSON file. + +Requires the DC_API_KEY environment variable to be set. """ import json @@ -36,10 +38,6 @@ flags.DEFINE_string( 'output_file', 'provenances_full.json', 'Path to the output JSON file where the hierarchy will be saved.') -flags.DEFINE_string( - 'api_key', None, - 'Data Commons API key. If not provided, the DC_API_KEY environment variable is used.' -) def get_node_property(node_data, prop_name, default=None): @@ -102,7 +100,7 @@ def fetch_all_provenances(api_key: str, output_file: str) -> None: provenance_data_map = {} dataset_dcids = set() - batch_size = 50 + batch_size = 100 for i in range(0, len(provenance_dcids), batch_size): batch = provenance_dcids[i:i + batch_size] batch_res = dc_api_wrapper(function=client.node.fetch, @@ -214,10 +212,10 @@ def fetch_all_provenances(api_key: str, output_file: str) -> None: def main(_): - api_key = FLAGS.api_key or os.environ.get('DC_API_KEY') + api_key = os.environ.get('DC_API_KEY') if not api_key: logging.fatal( - "DC_API_KEY environment variable not set and --api_key flag not provided." + "DC_API_KEY environment variable not set." ) sys.exit(1) From 43b7318155b57e8dcf6ee8fb2d9e698303a581eb Mon Sep 17 00:00:00 2001 From: Daksh Tulsyan Date: Mon, 8 Jun 2026 09:37:25 +0000 Subject: [PATCH 4/4] Format code using yapf --- util/extract_source_dataset_provenance.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/util/extract_source_dataset_provenance.py b/util/extract_source_dataset_provenance.py index 3043abadd1..3ca0fb32c9 100644 --- a/util/extract_source_dataset_provenance.py +++ b/util/extract_source_dataset_provenance.py @@ -194,7 +194,7 @@ def fetch_all_provenances(api_key: str, output_file: str) -> None: if src_dcid: source_info = source_data_map.get(src_dcid) - + dataset_info = { "name": ds_data.get("name"), "url": ds_data.get("url"), @@ -214,9 +214,7 @@ def fetch_all_provenances(api_key: str, output_file: str) -> None: def main(_): api_key = os.environ.get('DC_API_KEY') if not api_key: - logging.fatal( - "DC_API_KEY environment variable not set." - ) + logging.fatal("DC_API_KEY environment variable not set.") sys.exit(1) fetch_all_provenances(api_key=api_key, output_file=FLAGS.output_file)