Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions import-automation/workflow/ingestion-helper/aggregation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,103 @@ def run_provenance_summary_aggregation(self, import_names: List[str]) -> None:
GROUP BY variable_measured, provenance_dcid;
"""
self.executor.execute(query)
class PlaceAggregationGenerator:
"""Generates and runs place-based aggregations (e.g., summing sub-place data to parent places)."""
def __init__(self, executor: BigQueryExecutor, is_base_dc: bool = True) -> None:
self.executor = executor
self.is_base_dc = is_base_dc

def run_all(self, import_names: List[str]) -> None:
"""Runs all place aggregations in sequence."""
if not import_names:
logging.info("No imports specified. Skipping place aggregations.")
return

logging.info(f"Running place aggregations for imports: {import_names}")
self.aggregate_us_population_from_states(import_names)
self.aggregate_us_state_population_from_counties(import_names)

def aggregate_us_population_from_states(self, import_names: List[str]) -> None:
"""Calculates US country population by summing up populations of its states."""
if not import_names:
return

dest = self.executor.get_spanner_destination_uri()
connection_id = self.executor.connection_id
imports_str = ", ".join([f"'{name}'" for name in import_names])

# Placeholder SQL query that shows how the aggregation from US states to US country would work.
query = f"""
-- Placeholder: Calculate US population by summing state populations
-- 1. Fetch all population observations for US states and US country.
-- 2. Sum state-level populations and write/update country-level population.

CREATE OR REPLACE TEMPORARY TABLE `temp_state_populations` AS
SELECT
observation_about,
variable_measured,
value_num,
date_val
FROM (
-- This is a conceptual query structure reflecting state population lookup
SELECT
'geoId/06' as observation_about, -- California as placeholder
'Count_Person' as variable_measured,
39000000.0 as value_num,
'2020' as date_val
);

-- Conceptual aggregation logic:
-- SELECT SUM(value_num) as us_population, date_val
-- FROM temp_state_populations
-- GROUP BY date_val;

SELECT 'USA' as place, 'Count_Person' as variable, 330000000.0 as sum_population;
"""

logging.info("Running placeholder US population aggregation query...")
self.executor.execute(query)

def aggregate_us_state_population_from_counties(self, import_names: List[str]) -> None:
"""Calculates US state populations by summing up populations of their counties."""
if not import_names:
return

dest = self.executor.get_spanner_destination_uri()
connection_id = self.executor.connection_id
imports_str = ", ".join([f"'{name}'" for name in import_names])

# Placeholder SQL query that shows how the aggregation from US counties to US states would work.
query = f"""
-- Placeholder: Calculate US state populations by summing county populations
-- 1. Fetch all population observations for US counties and US states.
-- 2. Sum county-level populations and write/update state-level populations.

CREATE OR REPLACE TEMPORARY TABLE `temp_county_populations` AS
SELECT
observation_about,
variable_measured,
value_num,
date_val
FROM (
-- This is a conceptual query structure reflecting county population lookup
SELECT
'geoId/06075' as observation_about, -- San Francisco County as placeholder
'Count_Person' as variable_measured,
800000.0 as value_num,
'2020' as date_val
);

-- Conceptual aggregation logic:
-- SELECT SUM(value_num) as state_population, date_val
-- FROM temp_county_populations
-- GROUP BY date_val;

SELECT 'geoId/06' as place, 'Count_Person' as variable, 39000000.0 as sum_population;
"""

logging.info("Running placeholder State population aggregation query...")
self.executor.execute(query)


class AggregationUtils:
Expand All @@ -577,6 +674,7 @@ def __init__(self,
)
self.linked_edge_generator = LinkedEdgeGenerator(self.executor, is_base_dc)
self.provenance_summary_generator = ProvenanceSummaryGenerator(self.executor, is_base_dc)
self.place_aggregation_generator = PlaceAggregationGenerator(self.executor, is_base_dc)

def run_aggregation(self, import_list: List[Dict[str, Any]]) -> bool:
"""
Expand All @@ -601,6 +699,7 @@ def run_aggregation(self, import_list: List[Dict[str, Any]]) -> bool:

# 2. Run global aggregations
self.linked_edge_generator.run_all(import_names)
self.place_aggregation_generator.run_all(import_names)
self.provenance_summary_generator.run_all(import_names)

return True
Expand Down
Loading