diff --git a/import-automation/workflow/ingestion-helper/aggregation_utils.py b/import-automation/workflow/ingestion-helper/aggregation_utils.py index dbb9632dcb..c1cb458030 100644 --- a/import-automation/workflow/ingestion-helper/aggregation_utils.py +++ b/import-automation/workflow/ingestion-helper/aggregation_utils.py @@ -557,6 +557,103 @@ def run_provenance_summary_aggregation(self, import_names: List[str]) -> None: GROUP BY variable_measured, provenance_dcid; """ self.executor.execute(query) +class PlaceAggregationGenerator: + """Generates and runs place-based aggregations (e.g., summing sub-place data to parent places).""" + def __init__(self, executor: BigQueryExecutor, is_base_dc: bool = True) -> None: + self.executor = executor + self.is_base_dc = is_base_dc + + def run_all(self, import_names: List[str]) -> None: + """Runs all place aggregations in sequence.""" + if not import_names: + logging.info("No imports specified. Skipping place aggregations.") + return + + logging.info(f"Running place aggregations for imports: {import_names}") + self.aggregate_us_population_from_states(import_names) + self.aggregate_us_state_population_from_counties(import_names) + + def aggregate_us_population_from_states(self, import_names: List[str]) -> None: + """Calculates US country population by summing up populations of its states.""" + if not import_names: + return + + dest = self.executor.get_spanner_destination_uri() + connection_id = self.executor.connection_id + imports_str = ", ".join([f"'{name}'" for name in import_names]) + + # Placeholder SQL query that shows how the aggregation from US states to US country would work. + query = f""" + -- Placeholder: Calculate US population by summing state populations + -- 1. Fetch all population observations for US states and US country. + -- 2. Sum state-level populations and write/update country-level population. + + CREATE OR REPLACE TEMPORARY TABLE `temp_state_populations` AS + SELECT + observation_about, + variable_measured, + value_num, + date_val + FROM ( + -- This is a conceptual query structure reflecting state population lookup + SELECT + 'geoId/06' as observation_about, -- California as placeholder + 'Count_Person' as variable_measured, + 39000000.0 as value_num, + '2020' as date_val + ); + + -- Conceptual aggregation logic: + -- SELECT SUM(value_num) as us_population, date_val + -- FROM temp_state_populations + -- GROUP BY date_val; + + SELECT 'USA' as place, 'Count_Person' as variable, 330000000.0 as sum_population; + """ + + logging.info("Running placeholder US population aggregation query...") + self.executor.execute(query) + + def aggregate_us_state_population_from_counties(self, import_names: List[str]) -> None: + """Calculates US state populations by summing up populations of their counties.""" + if not import_names: + return + + dest = self.executor.get_spanner_destination_uri() + connection_id = self.executor.connection_id + imports_str = ", ".join([f"'{name}'" for name in import_names]) + + # Placeholder SQL query that shows how the aggregation from US counties to US states would work. + query = f""" + -- Placeholder: Calculate US state populations by summing county populations + -- 1. Fetch all population observations for US counties and US states. + -- 2. Sum county-level populations and write/update state-level populations. + + CREATE OR REPLACE TEMPORARY TABLE `temp_county_populations` AS + SELECT + observation_about, + variable_measured, + value_num, + date_val + FROM ( + -- This is a conceptual query structure reflecting county population lookup + SELECT + 'geoId/06075' as observation_about, -- San Francisco County as placeholder + 'Count_Person' as variable_measured, + 800000.0 as value_num, + '2020' as date_val + ); + + -- Conceptual aggregation logic: + -- SELECT SUM(value_num) as state_population, date_val + -- FROM temp_county_populations + -- GROUP BY date_val; + + SELECT 'geoId/06' as place, 'Count_Person' as variable, 39000000.0 as sum_population; + """ + + logging.info("Running placeholder State population aggregation query...") + self.executor.execute(query) class AggregationUtils: @@ -577,6 +674,7 @@ def __init__(self, ) self.linked_edge_generator = LinkedEdgeGenerator(self.executor, is_base_dc) self.provenance_summary_generator = ProvenanceSummaryGenerator(self.executor, is_base_dc) + self.place_aggregation_generator = PlaceAggregationGenerator(self.executor, is_base_dc) def run_aggregation(self, import_list: List[Dict[str, Any]]) -> bool: """ @@ -601,6 +699,7 @@ def run_aggregation(self, import_list: List[Dict[str, Any]]) -> bool: # 2. Run global aggregations self.linked_edge_generator.run_all(import_names) + self.place_aggregation_generator.run_all(import_names) self.provenance_summary_generator.run_all(import_names) return True