diff --git a/changelog_entry.yaml b/changelog_entry.yaml index 0f82eb65c..591f325d1 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,4 +1,4 @@ -- bump: patch +- bump: minor changes: - fixed: - - Versioning workflow checkout for push events + added: + - tests to verify SparseMatrixBuilder correctly calculates variables and constraints into the calibration matrix. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 19ee9249a..27a41bec7 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -15,7 +15,6 @@ from microimpute.models.qrf import QRF import logging - test_lite = os.environ.get("TEST_LITE") == "true" print(f"TEST_LITE == {test_lite}") diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 8bbe67bcc..4eb0a660b 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -22,7 +22,6 @@ from pathlib import Path import logging - try: import torch except ImportError: diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py b/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py index c2e2a08f0..aa954aba1 100644 --- a/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py +++ b/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py @@ -17,7 +17,6 @@ StateCode, ) - # State/Geographic Mappings STATE_CODES = { 1: "AL", @@ -193,15 +192,21 @@ def get_calculated_variables(sim) -> List[str]: """ Return variables that should be cleared for state-swap recalculation. - Includes variables with formulas, adds, or subtracts. - - Excludes ID variables (person_id, household_id, etc.) because: - 1. They have formulas that generate sequential IDs (0, 1, 2, ...) - 2. We need the original H5 values, not regenerated sequences - 3. PolicyEngine's random() function uses entity IDs as seeds: - seed = abs(entity_id * 100 + count_random_calls) - If IDs change, random-dependent variables (SSI resource test, - WIC nutritional risk, WIC takeup) produce different results. + Includes variables with formulas, or adds/subtracts that are lists. + + Excludes: + 1. ID variables (person_id, household_id, etc.) - needed for random seeds + 2. Variables with string adds/subtracts (parameter paths) - these are + pseudo-inputs stored in H5 that would recalculate differently using + parameter lookups. Examples: pre_tax_contributions. + 3. Variables in input_variables (have stored H5 values) even if they + have formulas - the stored values represent original survey data + that should be preserved. Examples: cdcc_relevant_expenses, rent. + + The exclusions are critical because: + - The H5 file stores pre-computed values from original CPS processing + - If deleted, recalculation produces different values, corrupting + downstream calculations like income_tax """ exclude_ids = { "person_id", @@ -211,16 +216,36 @@ def get_calculated_variables(sim) -> List[str]: "family_id", "marital_unit_id", } - return [ - name - for name, var in sim.tax_benefit_system.variables.items() - if ( - var.formulas - or getattr(var, "adds", None) - or getattr(var, "subtracts", None) - ) - and name not in exclude_ids - ] + + # Get stored input variables to exclude + input_vars = set(sim.input_variables) + + result = [] + for name, var in sim.tax_benefit_system.variables.items(): + if name in exclude_ids: + continue + + # Exclude variables that have stored values (input_variables) + # These represent original survey data that should be preserved + if name in input_vars: + continue + + # Include if has formulas + if var.formulas: + result.append(name) + continue + + # Include if adds/subtracts is a list (explicit component aggregation) + # Exclude if adds/subtracts is a string (parameter path - pseudo-input) + adds = getattr(var, "adds", None) + subtracts = getattr(var, "subtracts", None) + + if adds and isinstance(adds, list): + result.append(name) + elif subtracts and isinstance(subtracts, list): + result.append(name) + + return result def get_pseudo_input_variables(sim) -> set: diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py b/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py index e7cbf57b4..4823de1ef 100644 --- a/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py +++ b/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py @@ -46,7 +46,6 @@ create_target_groups, ) - logger = logging.getLogger(__name__) diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py b/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py index 3af0a8d86..b12629fba 100644 --- a/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py +++ b/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py @@ -38,6 +38,105 @@ def __init__( self.time_period = time_period self.cds_to_calibrate = cds_to_calibrate self.dataset_path = dataset_path + self._entity_rel_cache = None + + def _build_entity_relationship(self, sim) -> pd.DataFrame: + """ + Build entity relationship DataFrame mapping persons to all entity IDs. + + This is used to evaluate constraints at the person level and then + aggregate to household level, handling variables defined at different + entity levels (person, tax_unit, household, spm_unit). + + Returns: + DataFrame with person_id, household_id, tax_unit_id, spm_unit_id + """ + if self._entity_rel_cache is not None: + return self._entity_rel_cache + + self._entity_rel_cache = pd.DataFrame( + { + "person_id": sim.calculate( + "person_id", map_to="person" + ).values, + "household_id": sim.calculate( + "household_id", map_to="person" + ).values, + "tax_unit_id": sim.calculate( + "tax_unit_id", map_to="person" + ).values, + "spm_unit_id": sim.calculate( + "spm_unit_id", map_to="person" + ).values, + } + ) + return self._entity_rel_cache + + def _evaluate_constraints_entity_aware( + self, state_sim, constraints: List[dict], n_households: int + ) -> np.ndarray: + """ + Evaluate non-geographic constraints at person level, aggregate to + household level using .any(). + + This properly handles constraints on variables defined at different + entity levels (e.g., tax_unit_is_filer at tax_unit level). Instead of + summing values at household level (which would give 2, 3, etc. for + households with multiple tax units), we evaluate at person level and + use .any() aggregation ("does this household have at least one person + satisfying all constraints?"). + + Args: + state_sim: Microsimulation with state_fips set + constraints: List of constraint dicts with variable, operation, + value keys (geographic constraints should be pre-filtered) + n_households: Number of households + + Returns: + Boolean mask array of length n_households + """ + if not constraints: + return np.ones(n_households, dtype=bool) + + entity_rel = self._build_entity_relationship(state_sim) + n_persons = len(entity_rel) + + person_mask = np.ones(n_persons, dtype=bool) + + for c in constraints: + var = c["variable"] + op = c["operation"] + val = c["value"] + + # Calculate constraint variable at person level + constraint_values = state_sim.calculate( + var, map_to="person" + ).values + + # Apply operation at person level + person_mask &= apply_op(constraint_values, op, val) + + # Aggregate to household level using .any() + # "At least one person in this household satisfies ALL constraints" + entity_rel_with_mask = entity_rel.copy() + entity_rel_with_mask["satisfies"] = person_mask + + household_mask_series = entity_rel_with_mask.groupby("household_id")[ + "satisfies" + ].any() + + # Ensure we return a mask aligned with household order + household_ids = state_sim.calculate( + "household_id", map_to="household" + ).values + household_mask = np.array( + [ + household_mask_series.get(hh_id, False) + for hh_id in household_ids + ] + ) + + return household_mask def _query_targets(self, target_filter: dict) -> pd.DataFrame: """Query targets based on filter criteria using OR logic.""" @@ -166,6 +265,9 @@ def build_matrix( cds_by_state[state].append((cd_idx, cd)) for state, cd_list in cds_by_state.items(): + # Clear entity relationship cache when creating new simulation + self._entity_rel_cache = None + if self.dataset_path: state_sim = self._create_state_sim(state, n_households) else: @@ -184,35 +286,43 @@ def build_matrix( for row_idx, (_, target) in enumerate(targets_df.iterrows()): constraints = self._get_constraints(target["stratum_id"]) - mask = np.ones(n_households, dtype=bool) + geo_constraints = [] + non_geo_constraints = [] for c in constraints: + if c["variable"] in ( + "state_fips", + "congressional_district_geoid", + ): + geo_constraints.append(c) + else: + non_geo_constraints.append(c) + + # Check geographic constraints first (quick fail) + geo_mask = np.ones(n_households, dtype=bool) + for c in geo_constraints: if c["variable"] == "congressional_district_geoid": if ( c["operation"] in ("==", "=") and c["value"] != cd ): - mask[:] = False + geo_mask[:] = False elif c["variable"] == "state_fips": if ( c["operation"] in ("==", "=") and int(c["value"]) != state ): - mask[:] = False - else: - try: - values = state_sim.calculate( - c["variable"], map_to="household" - ).values - mask &= apply_op( - values, c["operation"], c["value"] - ) - except Exception as e: - # Variable may not exist or may not be - # calculable at household level - skip - logger.debug( - f"Could not evaluate constraint " - f"{c['variable']}: {e}" - ) + geo_mask[:] = False + + if not geo_mask.any(): + continue + + # Evaluate non-geographic constraints at entity level + entity_mask = self._evaluate_constraints_entity_aware( + state_sim, non_geo_constraints, n_households + ) + + # Combine geographic and entity-aware masks + mask = geo_mask & entity_mask if not mask.any(): continue diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 9d605acaa..c90255e3f 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -15,7 +15,6 @@ create_policyengine_uprating_factors_table, ) - rng = np.random.default_rng(seed=64) # Get Qualified Business Income simulation parameters --- diff --git a/policyengine_us_data/datasets/puf/uprate_puf.py b/policyengine_us_data/datasets/puf/uprate_puf.py index 1cf0eb9c6..961446156 100644 --- a/policyengine_us_data/datasets/puf/uprate_puf.py +++ b/policyengine_us_data/datasets/puf/uprate_puf.py @@ -2,7 +2,6 @@ import numpy as np from policyengine_us_data.storage import STORAGE_FOLDER - ITMDED_GROW_RATE = 0.02 # annual growth rate in itemized deduction amounts USE_VARIABLE_SPECIFIC_POPULATION_GROWTH_DIVISORS = False diff --git a/policyengine_us_data/db/create_database_tables.py b/policyengine_us_data/db/create_database_tables.py index df03772d0..920d1449e 100644 --- a/policyengine_us_data/db/create_database_tables.py +++ b/policyengine_us_data/db/create_database_tables.py @@ -15,7 +15,6 @@ from policyengine_us_data.storage import STORAGE_FOLDER - logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", diff --git a/policyengine_us_data/db/etl_age.py b/policyengine_us_data/db/etl_age.py index bb83067c4..d80faf065 100644 --- a/policyengine_us_data/db/etl_age.py +++ b/policyengine_us_data/db/etl_age.py @@ -11,7 +11,6 @@ ) from policyengine_us_data.utils.census import get_census_docs, pull_acs_table - LABEL_TO_SHORT = { "Estimate!!Total!!Total population!!AGE!!Under 5 years": "0-4", "Estimate!!Total!!Total population!!AGE!!5 to 9 years": "5-9", diff --git a/policyengine_us_data/db/etl_irs_soi.py b/policyengine_us_data/db/etl_irs_soi.py index 786abb1cc..6607a5dd6 100644 --- a/policyengine_us_data/db/etl_irs_soi.py +++ b/policyengine_us_data/db/etl_irs_soi.py @@ -24,7 +24,6 @@ get_district_mapping, ) - """See the 22incddocguide.docx manual from the IRS SOI""" # Let's make this work with strict inequalities # Language in the doc: '$10,000 under $25,000' diff --git a/policyengine_us_data/db/validate_database.py b/policyengine_us_data/db/validate_database.py index fee6a49dc..53ac09852 100644 --- a/policyengine_us_data/db/validate_database.py +++ b/policyengine_us_data/db/validate_database.py @@ -9,7 +9,6 @@ import pandas as pd from policyengine_us.system import system - conn = sqlite3.connect("policyengine_us_data/storage/policy_data.db") stratum_constraints_df = pd.read_sql("SELECT * FROM stratum_constraints", conn) diff --git a/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py b/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py index 349e6fbdd..1830bdb3a 100644 --- a/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py +++ b/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py @@ -9,7 +9,6 @@ STATE_NAME_TO_ABBREV, ) - STATE_NAME_TO_FIPS = { "Alabama": "01", "Alaska": "02", diff --git a/policyengine_us_data/tests/test_datasets/test_county_fips.py b/policyengine_us_data/tests/test_datasets/test_county_fips.py index ad1f10c5c..d692cf559 100644 --- a/policyengine_us_data/tests/test_datasets/test_county_fips.py +++ b/policyengine_us_data/tests/test_datasets/test_county_fips.py @@ -10,7 +10,6 @@ LOCAL_FOLDER, ) - # Sample data that mimics the format from census.gov SAMPLE_CENSUS_DATA = """STATE|STATEFP|COUNTYFP|COUNTYNAME AL|01|001|Autauga County diff --git a/policyengine_us_data/tests/test_local_area_calibration/conftest.py b/policyengine_us_data/tests/test_local_area_calibration/conftest.py index 04d6d7f52..7abcbafbf 100644 --- a/policyengine_us_data/tests/test_local_area_calibration/conftest.py +++ b/policyengine_us_data/tests/test_local_area_calibration/conftest.py @@ -1,4 +1,7 @@ -"""Shared fixtures for local area calibration tests.""" +"""Shared fixtures for local area calibration tests. + +Importantly, this file determines which variables will be included in the sparse matrix and calibrating routine. +""" import pytest import numpy as np @@ -16,6 +19,36 @@ get_calculated_variables, ) +# Variables to test for state-level value matching (CI uses subset for speed) +# Format: (variable_name, rtol) +# variable_name as per the targets in policy_data.db +# rtol is relative tolerance for comparison +VARIABLES_TO_TEST = [ + ("snap", 1e-2), + ("income_tax", 1e-2), + ("eitc", 1e-2), +] + +# CI filter config - minimal subset for fast CI runs +# Tests 3 representative variables covering benefits, taxes, and credits +COMBINED_FILTER_CONFIG = { + "stratum_group_ids": [ + 4, # SNAP targets + 117, # Income tax targets + ], + "variables": [ + "snap", + "income_tax", + "eitc", + ], +} + +# Maximum allowed mismatch rate for state-level value comparison +MAX_MISMATCH_RATE = 0.02 + +# Number of samples for cell-level verification tests +N_VERIFICATION_SAMPLES = 500 + @pytest.fixture(scope="module") def db_uri(): @@ -30,7 +63,7 @@ def dataset_path(): @pytest.fixture(scope="module") def test_cds(db_uri): - """CDs from NC, HI, MT, AK (manageable size, multiple same-state CDs).""" + """CDs from NC, HI, MT, AK (manageable size for CI, multiple same-state CDs).""" engine = create_engine(db_uri) query = """ SELECT DISTINCT sc.value as cd_geoid @@ -58,7 +91,7 @@ def sim(dataset_path): @pytest.fixture(scope="module") def matrix_data(db_uri, dataset_path, test_cds, sim): - """Build sparse matrix, return (targets_df, X_sparse, household_id_mapping).""" + """Build sparse matrix with all configured variables.""" builder = SparseMatrixBuilder( db_uri, time_period=2023, @@ -66,7 +99,7 @@ def matrix_data(db_uri, dataset_path, test_cds, sim): dataset_path=dataset_path, ) targets_df, X_sparse, household_id_mapping = builder.build_matrix( - sim, target_filter={"stratum_group_ids": [4], "variables": ["snap"]} + sim, target_filter=COMBINED_FILTER_CONFIG ) return targets_df, X_sparse, household_id_mapping diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py b/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py index ea9eca6f5..f3615e308 100644 --- a/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py +++ b/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py @@ -2,17 +2,19 @@ import pytest import numpy as np +from collections import defaultdict from policyengine_us import Microsimulation from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( get_calculated_variables, ) +from .conftest import VARIABLES_TO_TEST, N_VERIFICATION_SAMPLES + def test_cross_state_matches_swapped_sim( X_sparse, targets_df, - tracer, test_cds, dataset_path, n_households, @@ -25,8 +27,10 @@ def test_cross_state_matches_swapped_sim( When household moves to different state, X_sparse should contain the value calculated from a fresh simulation with state_fips set to destination state. + + Uses stratified sampling to ensure all variables in VARIABLES_TO_TEST + are covered with approximately equal samples per variable. """ - n_samples = 200 seed = 42 rng = np.random.default_rng(seed) n_hh = n_households @@ -48,28 +52,46 @@ def get_state_sim(state): nonzero_rows, nonzero_cols = X_sparse.nonzero() - cross_state_indices = [] + # Group cross-state cells by variable for stratified sampling + variable_to_indices = defaultdict(list) + variables_to_test = {v[0] for v in VARIABLES_TO_TEST} + for i in range(len(nonzero_rows)): + row_idx = nonzero_rows[i] col_idx = nonzero_cols[i] cd_idx = col_idx // n_hh hh_idx = col_idx % n_hh cd = test_cds[cd_idx] dest_state = int(cd) // 100 orig_state = int(hh_states[hh_idx]) - if dest_state != orig_state: - cross_state_indices.append(i) - if not cross_state_indices: - pytest.skip("No cross-state non-zero cells found") + # Only include cross-state cells + if dest_state == orig_state: + continue + + # Get variable for this row + variable = targets_df.iloc[row_idx]["variable"] + if variable in variables_to_test: + variable_to_indices[variable].append(i) + + if not variable_to_indices: + pytest.skip("No cross-state non-zero cells found for test variables") - sample_idx = rng.choice( - cross_state_indices, - min(n_samples, len(cross_state_indices)), - replace=False, + # Stratified sampling: sample proportionally from each variable + samples_per_var = max( + 1, N_VERIFICATION_SAMPLES // len(variable_to_indices) ) + sample_indices = [] + + for variable, indices in variable_to_indices.items(): + n_to_sample = min(samples_per_var, len(indices)) + sampled = rng.choice(indices, n_to_sample, replace=False) + sample_indices.extend(sampled) + errors = [] + variables_tested = set() - for idx in sample_idx: + for idx in sample_indices: row_idx = nonzero_rows[idx] col_idx = nonzero_cols[idx] cd_idx = col_idx // n_hh @@ -83,6 +105,8 @@ def get_state_sim(state): state_sim.calculate(variable, map_to="household").values[hh_idx] ) + variables_tested.add(variable) + if not np.isclose(actual, expected, atol=0.5): errors.append( { @@ -95,7 +119,13 @@ def get_state_sim(state): } ) + # Report which variables were tested + missing_vars = variables_to_test - variables_tested + if missing_vars: + print(f"Warning: No cross-state cells found for: {missing_vars}") + assert not errors, ( - f"Cross-state verification failed: {len(errors)}/{len(sample_idx)} " - f"mismatches. First 5: {errors[:5]}" + f"Cross-state verification failed: {len(errors)}/{len(sample_indices)} " + f"mismatches across {len(variables_tested)} variables. " + f"First 5: {errors[:5]}" ) diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_matrix_national_variation.py b/policyengine_us_data/tests/test_local_area_calibration/test_matrix_national_variation.py new file mode 100644 index 000000000..b59500894 --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_matrix_national_variation.py @@ -0,0 +1,485 @@ +""" +Tests for correctness in the sparse matrix builder, particularly for national level contributions. + +These tests verify that: +1. Matrix shape and structure are correct +2. Variable aggregation (person to household) preserves totals +3. National-level targets receive contributions from all states (no geographic + bias) +4. Cross-state recalculation applies state-specific rules +""" + +import pytest +import numpy as np +import pandas as pd +from policyengine_us_data.datasets.cps.local_area_calibration.sparse_matrix_builder import ( + SparseMatrixBuilder, +) + +from .conftest import ( + VARIABLES_TO_TEST, + COMBINED_FILTER_CONFIG, +) + +# Variables with state-specific variation (e.g., SNAP eligibility) +VARIABLES_WITH_STATE_VARIATION = [ + "snap", +] + + +@pytest.fixture(scope="module") +def builder(db_uri, dataset_path, test_cds): + """SparseMatrixBuilder configured with test CDs.""" + return SparseMatrixBuilder( + db_uri=db_uri, + time_period=2023, + cds_to_calibrate=test_cds, + dataset_path=dataset_path, + ) + + +def _get_geo_level(geo_id) -> str: + """Determine geographic level from geographic_id.""" + if geo_id == "US": + return "national" + try: + val = int(geo_id) + if 1 <= val <= 56: + return "state" + else: + return "district" + except (ValueError, TypeError): + return "unknown" + + +def test_person_level_aggregation_preserves_totals(sim): + """Health insurance premiums (person-level) sum correctly to household.""" + var = "health_insurance_premiums_without_medicare_part_b" + person_total = sim.calculate(var, 2023, map_to="person").values.sum() + household_total = sim.calculate(var, 2023, map_to="household").values.sum() + assert np.isclose(person_total, household_total, rtol=1e-6) + + +def test_matrix_shape(sim, builder): + """Matrix should have (n_targets, n_households * n_cds) shape.""" + targets_df, X_sparse, _ = builder.build_matrix( + sim, + target_filter={ + "variables": ["health_insurance_premiums_without_medicare_part_b"] + }, + ) + n_households = len( + sim.calculate("household_id", map_to="household").values + ) + n_cds = len(builder.cds_to_calibrate) + assert X_sparse.shape[1] == n_households * n_cds + + +def test_combined_variables_in_matrix(sim, builder): + """Matrix should include all configured variables.""" + targets_df, X_sparse, _ = builder.build_matrix( + sim, + target_filter=COMBINED_FILTER_CONFIG, + ) + variables = targets_df["variable"].unique() + + for var_name, _ in VARIABLES_TO_TEST: + assert var_name in variables, f"Missing variable: {var_name}" + + +class TestNationalLevelContributions: + """ + Tests verifying that national-level targets receive contributions from + households across all states, not just a geographic subset. + + The key insight: for a national target, when we look at a single CD's + column block, households from ALL original states should potentially + contribute (subject to meeting eligibility constraints). There should + be no systematic geographic bias where only households from certain + states contribute to the national total. + """ + + def test_national_targets_receive_multistate_contributions( + self, targets_df, X_sparse, household_states, n_households, test_cds + ): + """ + Verify that national-level targets have contributions from households + originally from multiple states. + + For each national target: + 1. Look at the matrix row + 2. For EACH CD's column block, identify which original states have + non-zero contributions + 3. Verify contributions come from multiple states (not geographically + biased) + """ + state_fips = household_states + cds = test_cds + + # Find national-level targets + national_targets = targets_df[ + targets_df["geographic_id"].apply( + lambda x: _get_geo_level(x) == "national" + ) + ] + + if len(national_targets) == 0: + pytest.skip("No national-level targets found") + + results = [] + + for _, target in national_targets.iterrows(): + row_idx = target.name + variable = target["variable"] + row = X_sparse[row_idx, :].toarray().flatten() + + # For each CD block, check which original states contribute + cd_contribution_stats = [] + + for cd_idx, cd in enumerate(cds): + col_start = cd_idx * n_households + col_end = col_start + n_households + cd_values = row[col_start:col_end] + + # Find households with non-zero values in this CD block + nonzero_mask = cd_values != 0 + nonzero_indices = np.where(nonzero_mask)[0] + + if len(nonzero_indices) == 0: + continue + + # Get original states of contributing households + contributing_states = set(state_fips[nonzero_indices]) + + cd_contribution_stats.append( + { + "cd": cd, + "cd_state": int(cd) // 100, + "n_contributing": len(nonzero_indices), + "n_states": len(contributing_states), + "contributing_states": contributing_states, + } + ) + + if not cd_contribution_stats: + results.append( + { + "variable": variable, + "status": "NO_CONTRIBUTIONS", + "details": "No non-zero values in any CD block", + } + ) + continue + + # Aggregate stats + stats_df = pd.DataFrame(cd_contribution_stats) + avg_states = stats_df["n_states"].mean() + min_states = stats_df["n_states"].min() + + # Check: on average, contributions should come from multiple states + # (at least 2, since we have CDs from 4 different states) + passed = avg_states >= 2 and min_states >= 1 + + results.append( + { + "variable": variable, + "status": "PASSED" if passed else "FAILED", + "avg_contributing_states": avg_states, + "min_contributing_states": min_states, + "n_cd_blocks_with_data": len(stats_df), + } + ) + + # Assert no geographic bias + failed = [r for r in results if r["status"] == "FAILED"] + assert len(failed) == 0, ( + f"Geographic bias detected in national targets: " + f"{[r['variable'] for r in failed]}" + ) + + def test_state_distribution_in_national_targets( + self, targets_df, X_sparse, household_states, n_households, test_cds + ): + """ + Verify the distribution of contributing states in national targets + roughly matches the original data distribution. + + This catches cases where one state dominates the contributions + disproportionately. + """ + state_fips = household_states + cds = test_cds + + # Get original state distribution (count of households per state) + unique_states, original_counts = np.unique( + state_fips, return_counts=True + ) + original_dist = dict(zip(unique_states, original_counts)) + total_hh = len(state_fips) + + # Find national-level targets + national_targets = targets_df[ + targets_df["geographic_id"].apply( + lambda x: _get_geo_level(x) == "national" + ) + ] + + if len(national_targets) == 0: + pytest.skip("No national-level targets found") + + for _, target in national_targets.iterrows(): + row_idx = target.name + variable = target["variable"] + row = X_sparse[row_idx, :].toarray().flatten() + + # Count contributions by original state across ALL CD blocks + state_contribution_counts = {} + + for cd_idx, cd in enumerate(cds): + col_start = cd_idx * n_households + col_end = col_start + n_households + cd_values = row[col_start:col_end] + + nonzero_mask = cd_values != 0 + nonzero_indices = np.where(nonzero_mask)[0] + + for hh_idx in nonzero_indices: + orig_state = state_fips[hh_idx] + state_contribution_counts[orig_state] = ( + state_contribution_counts.get(orig_state, 0) + 1 + ) + + if not state_contribution_counts: + continue + + # Check that no single state dominates excessively + total_contributions = sum(state_contribution_counts.values()) + max_contribution = max(state_contribution_counts.values()) + max_state = max( + state_contribution_counts, key=state_contribution_counts.get + ) + max_share = max_contribution / total_contributions + + # The max share should not exceed 70% (unless that state has 70%+ + # of households in the original data) + original_max_share = original_dist.get(max_state, 0) / total_hh + + # Allow 20% margin above original share + threshold = min(0.7, original_max_share + 0.2) + + assert max_share <= threshold, ( + f"State {max_state} dominates national {variable} target with " + f"{max_share:.1%} of contributions " + f"(original share: {original_max_share:.1%})" + ) + + +class TestCrossStateRecalculation: + """ + Tests verifying that household values change when borrowed to different + states, confirming state-specific rules are being applied. + + The key insight: for national-level targets (no state constraint), each + household appears in every CD block. The value in each CD block represents + what the variable would be if that household lived in that CD's state. + For state-dependent variables (like SNAP), values should differ across + states for at least some households. + + NOTE: This complements test_cross_state.py which verifies exact values. + These tests verify that variation exists (state rules are applied). + """ + + def test_values_change_across_states_for_national_targets( + self, targets_df, X_sparse, n_households, test_cds + ): + """ + Verify that for national targets, household values vary across CD + blocks from different states. + + This confirms the matrix builder is correctly recalculating variables + with state-specific rules when households are "borrowed" to different + geographic areas. + + The test checks: + 1. For each national target, examine households with non-zero values + 2. Compare each household's value across CD blocks from different states + 3. At least some households should have different values in different + states (confirming recalculation with different state rules) + """ + cds = test_cds + + # Group CDs by state + cds_by_state = {} + for cd_idx, cd in enumerate(cds): + state = int(cd) // 100 + if state not in cds_by_state: + cds_by_state[state] = [] + cds_by_state[state].append((cd_idx, cd)) + + states = list(cds_by_state.keys()) + if len(states) < 2: + pytest.skip("Need at least 2 states to test cross-state variation") + + # Find national-level targets + national_targets = targets_df[ + targets_df["geographic_id"].apply( + lambda x: _get_geo_level(x) == "national" + ) + ] + + if len(national_targets) == 0: + pytest.skip("No national-level targets found") + + results = [] + + for _, target in national_targets.iterrows(): + if target["variable"] not in VARIABLES_WITH_STATE_VARIATION: + continue + row_idx = target.name + variable = target["variable"] + row = X_sparse[row_idx, :].toarray().flatten() + + # For each household, collect values from different states + households_with_variation = 0 + households_checked = 0 + + # Sample households (check every 10th to keep test fast) + for hh_idx in range(0, n_households, 10): + # Get this household's value in each state (use first CD of + # each state) + state_values = {} + for state, cd_list in cds_by_state.items(): + cd_idx, _ = cd_list[0] # First CD in this state + col_idx = cd_idx * n_households + hh_idx + state_values[state] = row[col_idx] + + # Skip if all values are zero (household doesn't qualify for + # this variable) + nonzero_values = [v for v in state_values.values() if v != 0] + if len(nonzero_values) < 2: + continue + + households_checked += 1 + + # Check if values differ across states + unique_values = set(nonzero_values) + if len(unique_values) > 1: + households_with_variation += 1 + + variation_rate = ( + households_with_variation / households_checked + if households_checked > 0 + else 0 + ) + + results.append( + { + "variable": variable, + "households_checked": households_checked, + "households_with_variation": households_with_variation, + "variation_rate": variation_rate, + } + ) + + # For state-dependent variables, we expect SOME variation + # (not all households will vary - some may have $0 or max benefits + # regardless of state) + # The key is that variation exists, confirming recalculation occurs + for r in results: + if r["households_checked"] > 0: + # At least 10% of households should show variation for + # state-dependent variables + assert ( + r["variation_rate"] > 0.1 or r["households_checked"] < 10 + ), ( + f"No cross-state variation found for {r['variable']}. " + f"This suggests state-specific rules may not be applied " + f"when households are borrowed to different states." + ) + + def test_same_household_different_states_shows_rule_changes( + self, targets_df, X_sparse, household_states, n_households, test_cds + ): + """ + Deep dive test: pick specific households and verify their values + differ across states in a way consistent with state-specific rules. + + For SNAP specifically, different states have different: + - Standard deductions + - Shelter deduction caps + - Vehicle allowances + - Categorical eligibility rules + + This test finds households where we can verify the recalculation + is applying different state rules. + """ + state_fips_orig = household_states + cds = test_cds + + # Group CDs by state + cds_by_state = {} + for cd_idx, cd in enumerate(cds): + state = int(cd) // 100 + if state not in cds_by_state: + cds_by_state[state] = [] + cds_by_state[state].append((cd_idx, cd)) + + states = sorted(cds_by_state.keys()) + if len(states) < 2: + pytest.skip("Need at least 2 states") + + # Find national SNAP target (most state-dependent) + snap_national = targets_df[ + (targets_df["variable"] == "snap") + & ( + targets_df["geographic_id"].apply( + lambda x: _get_geo_level(x) == "national" + ) + ) + ] + + if len(snap_national) == 0: + pytest.skip("No national SNAP target found") + + row_idx = snap_national.iloc[0].name + row = X_sparse[row_idx, :].toarray().flatten() + + # Find households with interesting variation patterns + example_households = [] + + for hh_idx in range(n_households): + state_values = {} + for state, cd_list in cds_by_state.items(): + cd_idx, _ = cd_list[0] + col_idx = cd_idx * n_households + hh_idx + state_values[state] = row[col_idx] + + # Look for households where: + # 1. At least 2 states have non-zero SNAP + # 2. The values differ significantly (>10% relative difference) + nonzero_states = {s: v for s, v in state_values.items() if v > 0} + + if len(nonzero_states) >= 2: + values = list(nonzero_states.values()) + max_val = max(values) + min_val = min(values) + if min_val > 0 and (max_val - min_val) / min_val > 0.1: + example_households.append( + { + "hh_idx": hh_idx, + "original_state": state_fips_orig[hh_idx], + "state_values": nonzero_states, + "max_val": max_val, + "min_val": min_val, + "variation": (max_val - min_val) / min_val, + } + ) + + if len(example_households) >= 5: + break + + # Assert we found at least one household with variation + assert len(example_households) > 0, ( + "Expected to find households with >10% SNAP variation across " + "states, confirming state-specific rules are applied" + ) diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py b/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py index a13f459d8..ec9200b32 100644 --- a/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py +++ b/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py @@ -1,99 +1,123 @@ -"""Test same-state values match fresh simulations.""" +"""Test same-state values match original simulation values.""" import pytest import numpy as np +from collections import defaultdict -from policyengine_us import Microsimulation -from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( - get_calculated_variables, -) +from .conftest import VARIABLES_TO_TEST, N_VERIFICATION_SAMPLES def test_same_state_matches_original( + sim, X_sparse, targets_df, - tracer, - sim, test_cds, - dataset_path, n_households, household_ids, household_states, ): """ - Same-state non-zero cells must match fresh same-state simulation. + Same-state non-zero cells must match ORIGINAL simulation values. When household stays in same state, X_sparse should contain the value - calculated from a fresh simulation with state_fips set to that state. + from the original simulation (ground truth from H5 dataset). + + Uses stratified sampling to ensure all variables in VARIABLES_TO_TEST + are covered with approximately equal samples per variable. """ - n_samples = 200 seed = 42 rng = np.random.default_rng(seed) n_hh = n_households hh_ids = household_ids hh_states = household_states - state_sims = {} - - def get_state_sim(state): - if state not in state_sims: - s = Microsimulation(dataset=dataset_path) - s.set_input( - "state_fips", 2023, np.full(n_hh, state, dtype=np.int32) - ) - for var in get_calculated_variables(s): - s.delete_arrays(var) - state_sims[state] = s - return state_sims[state] - nonzero_rows, nonzero_cols = X_sparse.nonzero() - same_state_indices = [] + # Group same-state cells by variable for stratified sampling + variable_to_indices = defaultdict(list) + variables_to_test = {v[0] for v in VARIABLES_TO_TEST} + for i in range(len(nonzero_rows)): + row_idx = nonzero_rows[i] col_idx = nonzero_cols[i] cd_idx = col_idx // n_hh hh_idx = col_idx % n_hh cd = test_cds[cd_idx] dest_state = int(cd) // 100 orig_state = int(hh_states[hh_idx]) - if dest_state == orig_state: - same_state_indices.append(i) - if not same_state_indices: - pytest.skip("No same-state non-zero cells found") + # Only include same-state cells + if dest_state != orig_state: + continue + + variable = targets_df.iloc[row_idx]["variable"] + if variable in variables_to_test: + variable_to_indices[variable].append(i) + + if not variable_to_indices: + pytest.skip("No same-state non-zero cells found for test variables") - sample_idx = rng.choice( - same_state_indices, - min(n_samples, len(same_state_indices)), - replace=False, + # Stratified sampling: sample proportionally from each variable + samples_per_var = max( + 1, N_VERIFICATION_SAMPLES // len(variable_to_indices) ) + sample_indices = [] + + for variable, indices in variable_to_indices.items(): + n_to_sample = min(samples_per_var, len(indices)) + sampled = rng.choice(indices, n_to_sample, replace=False) + sample_indices.extend(sampled) + + # Cache original values per variable to avoid repeated calculations + original_values_cache = {} + + def get_original_values(variable): + if variable not in original_values_cache: + original_values_cache[variable] = sim.calculate( + variable, map_to="household" + ).values + return original_values_cache[variable] + errors = [] + variables_tested = set() - for idx in sample_idx: + for idx in sample_indices: row_idx = nonzero_rows[idx] col_idx = nonzero_cols[idx] cd_idx = col_idx // n_hh hh_idx = col_idx % n_hh - cd = test_cds[cd_idx] - dest_state = int(cd) // 100 variable = targets_df.iloc[row_idx]["variable"] actual = float(X_sparse[row_idx, col_idx]) - state_sim = get_state_sim(dest_state) - expected = float( - state_sim.calculate(variable, map_to="household").values[hh_idx] - ) + + # Compare to ORIGINAL simulation values (ground truth) + original_values = get_original_values(variable) + expected = float(original_values[hh_idx]) + + variables_tested.add(variable) if not np.isclose(actual, expected, atol=0.5): errors.append( { "hh_id": hh_ids[hh_idx], + "hh_idx": hh_idx, "variable": variable, "actual": actual, "expected": expected, + "diff": actual - expected, + "rel_diff": ( + (actual - expected) / expected + if expected != 0 + else np.inf + ), } ) + missing_vars = variables_to_test - variables_tested + if missing_vars: + print(f"Warning: No same-state cells found for: {missing_vars}") + assert not errors, ( - f"Same-state verification failed: {len(errors)}/{len(sample_idx)} " - f"mismatches. First 5: {errors[:5]}" + f"Same-state verification failed: {len(errors)}/{len(sample_indices)} " + f"mismatches across {len(variables_tested)} variables. " + f"First 5: {errors[:5]}" ) diff --git a/policyengine_us_data/utils/census.py b/policyengine_us_data/utils/census.py index 2f424ccb8..8081b6162 100644 --- a/policyengine_us_data/utils/census.py +++ b/policyengine_us_data/utils/census.py @@ -4,7 +4,6 @@ import pandas as pd import numpy as np - STATE_NAME_TO_FIPS = { "Alabama": "01", "Alaska": "02", diff --git a/policyengine_us_data/utils/huggingface.py b/policyengine_us_data/utils/huggingface.py index 2860adf3d..a312b5240 100644 --- a/policyengine_us_data/utils/huggingface.py +++ b/policyengine_us_data/utils/huggingface.py @@ -1,7 +1,6 @@ from huggingface_hub import hf_hub_download, login, HfApi import os - TOKEN = os.environ.get("HUGGING_FACE_TOKEN") if not TOKEN: raise ValueError( diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index cbea6dabb..e368d5048 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -9,7 +9,6 @@ from policyengine_core.reforms import Reform from policyengine_us_data.utils.soi import pe_to_soi, get_soi - # CPS-derived statistics # Medical expenses, sum of spm thresholds # Child support expenses diff --git a/policyengine_us_data/utils/spm.py b/policyengine_us_data/utils/spm.py index 070db5339..b2e4538b5 100644 --- a/policyengine_us_data/utils/spm.py +++ b/policyengine_us_data/utils/spm.py @@ -3,7 +3,6 @@ import numpy as np from spm_calculator import SPMCalculator, spm_equivalence_scale - TENURE_CODE_MAP = { 1: "owner_with_mortgage", 2: "owner_without_mortgage", diff --git a/tests/test_h6_reform.py b/tests/test_h6_reform.py index 7253ed97c..e68ed8db3 100644 --- a/tests/test_h6_reform.py +++ b/tests/test_h6_reform.py @@ -11,7 +11,6 @@ import pytest - # Constants from the H6 reform implementation HI_SINGLE = 34_000 HI_JOINT = 44_000