diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index 72594631..813c00d7 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -34,6 +34,7 @@ from policyengine_us_data.utils.takeup import ( SIMPLE_TAKEUP_VARS, apply_block_takeup_to_arrays, + any_person_flag_by_entity, ) CHECKPOINT_FILE = Path("completed_states.txt") @@ -526,6 +527,19 @@ def build_h5( } hh_state_fips = clone_geo["state_fips"].astype(np.int32) original_hh_ids = household_ids[active_hh].astype(np.int64) + reported_anchors = {} + if "reported_has_marketplace_health_coverage_at_interview" in data: + reported_anchors["takes_up_aca_if_eligible"] = any_person_flag_by_entity( + data["person_tax_unit_id"][time_period], + data["tax_unit_id"][time_period], + data["reported_has_marketplace_health_coverage_at_interview"][ + time_period + ], + ) + if "reported_has_means_tested_health_coverage_at_interview" in data: + reported_anchors["takes_up_medicaid_if_eligible"] = data[ + "reported_has_means_tested_health_coverage_at_interview" + ][time_period].astype(bool) takeup_results = apply_block_takeup_to_arrays( hh_blocks=active_blocks, @@ -535,6 +549,7 @@ def build_h5( entity_counts=entity_counts, time_period=time_period, takeup_filter=takeup_filter, + reported_anchors=reported_anchors, ) for var_name, bools in takeup_results.items(): data[var_name] = {time_period: bools} diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index 04d785ff..5d38d0ef 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -14,6 +14,7 @@ from collections import defaultdict from typing import Dict, List, Optional, Tuple +import h5py import numpy as np import pandas as pd from scipy import sparse @@ -532,6 +533,7 @@ def _process_single_clone( entity_hh_idx_map = sd.get("entity_hh_idx_map", {}) entity_to_person_idx = sd.get("entity_to_person_idx", {}) precomputed_rates = sd.get("precomputed_rates", {}) + reported_takeup_anchors = sd.get("reported_takeup_anchors", {}) # Slice geography for this clone clone_states = geo_states[col_start:col_end] @@ -594,6 +596,7 @@ def _process_single_clone( precomputed_rates[info["rate_key"]], ent_blocks, ent_hh_ids, + reported_mask=reported_takeup_anchors.get(takeup_var), ) ent_values = (ent_eligible * ent_takeup).astype(np.float32) @@ -1876,6 +1879,7 @@ def build_matrix( from policyengine_us_data.utils.takeup import ( TAKEUP_AFFECTED_TARGETS, compute_block_takeup_for_entities, + any_person_flag_by_entity, ) from policyengine_us_data.parameters import ( load_take_up_rate, @@ -1904,6 +1908,35 @@ def build_matrix( "person": person_hh_indices, } + reported_takeup_anchors = {} + with h5py.File(self.dataset_path, "r") as f: + period_key = str(self.time_period) + if ( + "reported_has_marketplace_health_coverage_at_interview" in f + and period_key + in f["reported_has_marketplace_health_coverage_at_interview"] + ): + person_marketplace = f[ + "reported_has_marketplace_health_coverage_at_interview" + ][period_key][...].astype(bool) + person_tax_unit_ids = f["person_tax_unit_id"][period_key][...] + tax_unit_ids = f["tax_unit_id"][period_key][...] + reported_takeup_anchors["takes_up_aca_if_eligible"] = ( + any_person_flag_by_entity( + person_tax_unit_ids, + tax_unit_ids, + person_marketplace, + ) + ) + if ( + "reported_has_means_tested_health_coverage_at_interview" in f + and period_key + in f["reported_has_means_tested_health_coverage_at_interview"] + ): + reported_takeup_anchors["takes_up_medicaid_if_eligible"] = f[ + "reported_has_means_tested_health_coverage_at_interview" + ][period_key][...].astype(bool) + entity_to_person_idx = {} for entity_level in ("spm_unit", "tax_unit"): ent_ids = sim.calculate( @@ -1940,6 +1973,7 @@ def build_matrix( self.household_ids = household_ids self.precomputed_rates = precomputed_rates self.affected_target_info = affected_target_info + self.reported_takeup_anchors = reported_takeup_anchors # 5d. Clone loop from pathlib import Path @@ -1987,6 +2021,7 @@ def build_matrix( shared_data["entity_hh_idx_map"] = entity_hh_idx_map shared_data["entity_to_person_idx"] = entity_to_person_idx shared_data["precomputed_rates"] = precomputed_rates + shared_data["reported_takeup_anchors"] = reported_takeup_anchors logger.info( "Starting parallel clone processing: %d clones, %d workers", @@ -2124,6 +2159,7 @@ def build_matrix( precomputed_rates[info["rate_key"]], ent_blocks, ent_hh_ids, + reported_mask=reported_takeup_anchors.get(takeup_var), ) ent_values = (ent_eligible * ent_takeup).astype(np.float32) diff --git a/policyengine_us_data/datasets/cps/census_cps.py b/policyengine_us_data/datasets/cps/census_cps.py index 042fefe5..fb97b651 100644 --- a/policyengine_us_data/datasets/cps/census_cps.py +++ b/policyengine_us_data/datasets/cps/census_cps.py @@ -236,7 +236,24 @@ class CensusCPS_2018(CensusCPS): "A_AGE", "A_SEX", "PEDISEYE", + "NOW_COV", + "NOW_DIR", "NOW_MRK", + "NOW_MRKS", + "NOW_MRKUN", + "NOW_NONM", + "NOW_PRIV", + "NOW_PUB", + "NOW_GRP", + "NOW_CAID", + "NOW_MCAID", + "NOW_PCHIP", + "NOW_OTHMT", + "NOW_MCARE", + "NOW_MIL", + "NOW_CHAMPVA", + "NOW_VACARE", + "NOW_IHSFLG", "WSAL_VAL", "INT_VAL", "SEMP_VAL", @@ -294,7 +311,6 @@ class CensusCPS_2018(CensusCPS): "PMED_VAL", "PEMCPREM", "PRCITSHP", - "NOW_GRP", "POCCU2", "PEINUSYR", "MCARE", diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 418d7396..94511cdb 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -16,6 +16,31 @@ import logging from policyengine_us_data.parameters import load_take_up_rate from policyengine_us_data.utils.randomness import seeded_rng +from policyengine_us_data.utils.takeup import ( + any_person_flag_by_entity, + assign_takeup_with_reported_anchors, +) + + +CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = { + "reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR", + "reported_has_marketplace_health_coverage_at_interview": "NOW_MRK", + "reported_has_subsidized_marketplace_health_coverage_at_interview": "NOW_MRKS", + "reported_has_unsubsidized_marketplace_health_coverage_at_interview": "NOW_MRKUN", + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview": ( + "NOW_NONM" + ), + "reported_has_employer_sponsored_health_coverage_at_interview": "NOW_GRP", + "reported_has_medicare_health_coverage_at_interview": "NOW_MCARE", + "reported_has_medicaid_health_coverage_at_interview": "NOW_CAID", + "reported_has_means_tested_health_coverage_at_interview": "NOW_MCAID", + "reported_has_chip_health_coverage_at_interview": "NOW_PCHIP", + "reported_has_other_means_tested_health_coverage_at_interview": "NOW_OTHMT", + "reported_has_tricare_health_coverage_at_interview": "NOW_MIL", + "reported_has_champva_health_coverage_at_interview": "NOW_CHAMPVA", + "reported_has_va_health_coverage_at_interview": "NOW_VACARE", + "reported_has_indian_health_service_coverage_at_interview": "NOW_IHSFLG", +} class CPS(Dataset): @@ -241,7 +266,16 @@ def add_takeup(self): # ACA rng = seeded_rng("takes_up_aca_if_eligible") - data["takes_up_aca_if_eligible"] = rng.random(n_tax_units) < aca_rate + reported_marketplace_by_tax_unit = any_person_flag_by_entity( + data["person_tax_unit_id"], + data["tax_unit_id"], + data["reported_has_marketplace_health_coverage_at_interview"], + ) + data["takes_up_aca_if_eligible"] = assign_takeup_with_reported_anchors( + rng.random(n_tax_units), + aca_rate, + reported_mask=reported_marketplace_by_tax_unit, + ) # Medicaid: state-specific rates state_codes = baseline.calculate("state_code_str").values @@ -253,8 +287,11 @@ def add_takeup(self): [medicaid_rates_by_state.get(s, 0.93) for s in person_states] ) rng = seeded_rng("takes_up_medicaid_if_eligible") - data["takes_up_medicaid_if_eligible"] = ( - rng.random(n_persons) < medicaid_rate_by_person + data["takes_up_medicaid_if_eligible"] = assign_takeup_with_reported_anchors( + rng.random(n_persons), + medicaid_rate_by_person, + reported_mask=data["reported_has_means_tested_health_coverage_at_interview"], + group_keys=person_states, ) # Head Start @@ -462,9 +499,38 @@ def children_per_parent(col: str) -> pd.DataFrame: ) cps["own_children_in_household"] = tmp.children.fillna(0) - cps["has_marketplace_health_coverage"] = person.NOW_MRK == 1 + for variable, cps_column in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP.items(): + cps[variable] = person[cps_column] == 1 + + cps["reported_has_private_health_coverage_at_interview"] = person.NOW_PRIV == 1 + cps["reported_has_public_health_coverage_at_interview"] = person.NOW_PUB == 1 + cps["reported_is_insured_at_interview"] = person.NOW_COV == 1 + cps["reported_is_uninsured_at_interview"] = person.NOW_COV != 1 - cps["has_esi"] = person.NOW_GRP == 1 + coverage_families = np.column_stack( + [ + cps["reported_has_employer_sponsored_health_coverage_at_interview"], + cps["reported_has_marketplace_health_coverage_at_interview"], + cps[ + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview" + ], + cps["reported_has_medicare_health_coverage_at_interview"], + cps["reported_has_means_tested_health_coverage_at_interview"], + cps["reported_has_tricare_health_coverage_at_interview"], + cps["reported_has_champva_health_coverage_at_interview"], + cps["reported_has_va_health_coverage_at_interview"], + cps["reported_has_indian_health_service_coverage_at_interview"], + ] + ) + cps["reported_has_multiple_health_coverage_at_interview"] = ( + coverage_families.sum(axis=1) > 1 + ) + + # Legacy aliases retained for compatibility until rules-side names catch up. + cps["has_marketplace_health_coverage"] = cps[ + "reported_has_marketplace_health_coverage_at_interview" + ] + cps["has_esi"] = cps["reported_has_employer_sponsored_health_coverage_at_interview"] cps["cps_race"] = person.PRDTRACE cps["is_hispanic"] = person.PRDTHSP != 0 diff --git a/policyengine_us_data/tests/test_calibration/test_unified_calibration.py b/policyengine_us_data/tests/test_calibration/test_unified_calibration.py index 28a3c906..85d41f38 100644 --- a/policyengine_us_data/tests/test_calibration/test_unified_calibration.py +++ b/policyengine_us_data/tests/test_calibration/test_unified_calibration.py @@ -177,6 +177,22 @@ def test_different_blocks_different_result(self): differs = any(not np.array_equal(r1[v], r2[v]) for v in r1) assert differs + def test_reported_anchors_feed_through_for_aca(self): + args = self._make_arrays(4, 1, 1, 1) + result = apply_block_takeup_to_arrays( + *args, + time_period=2024, + takeup_filter=["takes_up_aca_if_eligible"], + precomputed_rates={"aca": 0.25}, + reported_anchors={ + "takes_up_aca_if_eligible": np.array([True, False, False, False]) + }, + ) + np.testing.assert_array_equal( + result["takes_up_aca_if_eligible"], + [True, False, False, False], + ) + class TestResolveRate: """Verify _resolve_rate handles scalar and dict rates.""" diff --git a/policyengine_us_data/tests/test_datasets/test_cps.py b/policyengine_us_data/tests/test_datasets/test_cps.py index f0346939..5b9fed14 100644 --- a/policyengine_us_data/tests/test_datasets/test_cps.py +++ b/policyengine_us_data/tests/test_datasets/test_cps.py @@ -1,5 +1,72 @@ import pytest import numpy as np +import pandas as pd + + +def test_add_personal_variables_maps_current_health_coverage_flags(): + from policyengine_us_data.datasets.cps.cps import add_personal_variables + + person = pd.DataFrame( + { + "A_AGE": [30, 45], + "A_SEX": [2, 1], + "PEDISEYE": [0, 1], + "PEDISDRS": [0, 0], + "PEDISEAR": [0, 0], + "PEDISOUT": [0, 0], + "PEDISPHY": [0, 0], + "PEDISREM": [0, 0], + "PEPAR1": [0, 0], + "PEPAR2": [0, 0], + "PH_SEQ": [1, 1], + "A_LINENO": [1, 2], + "NOW_COV": [1, 0], + "NOW_DIR": [1, 0], + "NOW_MRK": [1, 0], + "NOW_MRKS": [1, 0], + "NOW_MRKUN": [0, 0], + "NOW_NONM": [0, 0], + "NOW_PRIV": [1, 0], + "NOW_PUB": [0, 1], + "NOW_GRP": [0, 1], + "NOW_CAID": [0, 1], + "NOW_MCAID": [0, 1], + "NOW_PCHIP": [0, 0], + "NOW_OTHMT": [0, 0], + "NOW_MCARE": [0, 0], + "NOW_MIL": [0, 0], + "NOW_CHAMPVA": [0, 0], + "NOW_VACARE": [0, 0], + "NOW_IHSFLG": [0, 0], + "PRDTRACE": [1, 2], + "PRDTHSP": [0, 1], + "A_MARITL": [1, 4], + "A_HSCOL": [0, 2], + "POCCU2": [39, 52], + } + ) + cps = {} + + add_personal_variables(cps, person) + + np.testing.assert_array_equal( + cps["reported_has_marketplace_health_coverage_at_interview"], + [True, False], + ) + np.testing.assert_array_equal( + cps["reported_has_means_tested_health_coverage_at_interview"], + [False, True], + ) + np.testing.assert_array_equal( + cps["reported_is_uninsured_at_interview"], + [False, True], + ) + np.testing.assert_array_equal( + cps["reported_has_multiple_health_coverage_at_interview"], + [False, True], + ) + np.testing.assert_array_equal(cps["has_marketplace_health_coverage"], [True, False]) + np.testing.assert_array_equal(cps["has_esi"], [False, True]) def test_cps_has_auto_loan_interest(): diff --git a/policyengine_us_data/tests/test_stochastic_variables.py b/policyengine_us_data/tests/test_stochastic_variables.py index b9ab1346..383485e1 100644 --- a/policyengine_us_data/tests/test_stochastic_variables.py +++ b/policyengine_us_data/tests/test_stochastic_variables.py @@ -3,6 +3,10 @@ import pytest import numpy as np from policyengine_us_data.parameters import load_take_up_rate +from policyengine_us_data.utils.takeup import ( + any_person_flag_by_entity, + assign_takeup_with_reported_anchors, +) from policyengine_us_data.utils.randomness import ( _stable_string_hash, seeded_rng, @@ -148,3 +152,39 @@ def test_state_specific_medicaid_proportions(self): for state, expected_rate in [("UT", 0.53), ("CO", 0.99)]: take_up = draws[:10_000] < expected_rate assert abs(take_up.mean() - expected_rate) < 0.05 + + +class TestReportedTakeupAnchors: + def test_global_anchor_preserves_reported_and_fills_remaining(self): + draws = np.array([0.9, 0.2, 0.6, 0.9]) + reported = np.array([True, False, False, False]) + result = assign_takeup_with_reported_anchors( + draws, + 0.5, + reported_mask=reported, + ) + np.testing.assert_array_equal(result, [True, True, False, False]) + + def test_grouped_anchor_applies_within_each_group(self): + draws = np.array([0.9, 0.2, 0.1, 0.9]) + rates = np.array([0.5, 0.5, 0.5, 0.5]) + reported = np.array([True, False, False, False]) + groups = np.array(["A", "A", "B", "B"]) + result = assign_takeup_with_reported_anchors( + draws, + rates, + reported_mask=reported, + group_keys=groups, + ) + np.testing.assert_array_equal(result, [True, False, True, False]) + + def test_any_person_flag_by_entity_aggregates_correctly(self): + person_tax_unit_ids = np.array([10, 10, 20, 30]) + tax_unit_ids = np.array([10, 20, 30]) + person_marketplace = np.array([False, True, False, True]) + result = any_person_flag_by_entity( + person_tax_unit_ids, + tax_unit_ids, + person_marketplace, + ) + np.testing.assert_array_equal(result, [True, False, True]) diff --git a/policyengine_us_data/utils/takeup.py b/policyengine_us_data/utils/takeup.py index 5e49b20a..fcc5eb5d 100644 --- a/policyengine_us_data/utils/takeup.py +++ b/policyengine_us_data/utils/takeup.py @@ -164,6 +164,90 @@ } +def any_person_flag_by_entity( + person_entity_ids: np.ndarray, + entity_ids: np.ndarray, + person_mask: np.ndarray, +) -> np.ndarray: + """Aggregate a person-level boolean to any-covered at entity level.""" + person_entity_ids = np.asarray(person_entity_ids) + entity_ids = np.asarray(entity_ids) + person_mask = np.asarray(person_mask, dtype=bool) + if len(person_entity_ids) != len(person_mask): + raise ValueError("person_entity_ids and person_mask must align") + if not person_mask.any(): + return np.zeros(len(entity_ids), dtype=bool) + flagged_ids = np.unique(person_entity_ids[person_mask]) + return np.isin(entity_ids, flagged_ids) + + +def assign_takeup_with_reported_anchors( + draws: np.ndarray, + rates, + reported_mask: Optional[np.ndarray] = None, + group_keys: Optional[np.ndarray] = None, +) -> np.ndarray: + """Apply the SSI/SNAP-style reported-first takeup pattern. + + Reported recipients are always assigned takeup=True. Remaining + non-reporters are filled probabilistically to reach the target count + implied by the rate, either globally or within each ``group_keys`` + group. + """ + draws = np.asarray(draws, dtype=np.float64) + if np.isscalar(rates): + rates_arr = np.full(len(draws), float(rates), dtype=np.float64) + else: + rates_arr = np.asarray(rates, dtype=np.float64) + if len(rates_arr) != len(draws): + raise ValueError("rates and draws must align") + + baseline = draws < rates_arr + if reported_mask is None: + return baseline + + reported_mask = np.asarray(reported_mask, dtype=bool) + if len(reported_mask) != len(draws): + raise ValueError("reported_mask and draws must align") + + result = reported_mask.copy() + + if group_keys is None: + unique_rates = np.unique(rates_arr) + if len(unique_rates) != 1: + raise ValueError("group_keys required when rates vary by entity") + target_count = int(unique_rates[0] * len(draws)) + non_reporters = ~reported_mask + remaining_needed = max(0, target_count - int(reported_mask.sum())) + adjusted_rate = ( + remaining_needed / int(non_reporters.sum()) if non_reporters.any() else 0 + ) + result |= non_reporters & (draws < adjusted_rate) + return result + + group_keys = np.asarray(group_keys) + if len(group_keys) != len(draws): + raise ValueError("group_keys and draws must align") + + for key in np.unique(group_keys): + group_mask = group_keys == key + group_rates = np.unique(rates_arr[group_mask]) + if len(group_rates) != 1: + raise ValueError("Each takeup group must have a single rate") + target_count = int(group_rates[0] * int(group_mask.sum())) + group_reported = reported_mask[group_mask] + remaining_needed = max(0, target_count - int(group_reported.sum())) + group_non_reporters = group_mask & ~reported_mask + adjusted_rate = ( + remaining_needed / int(group_non_reporters.sum()) + if group_non_reporters.any() + else 0 + ) + result[group_non_reporters] = draws[group_non_reporters] < adjusted_rate + + return result + + def _resolve_rate( rate_or_dict, state_fips: int, @@ -184,6 +268,7 @@ def compute_block_takeup_for_entities( entity_blocks: np.ndarray, entity_hh_ids: np.ndarray = None, entity_clone_ids: np.ndarray = None, + reported_mask: Optional[np.ndarray] = None, ) -> np.ndarray: """Compute boolean takeup via block-level seeded draws. @@ -217,6 +302,7 @@ def compute_block_takeup_for_entities( n = len(entity_blocks) draws = np.zeros(n, dtype=np.float64) rates = np.ones(n, dtype=np.float64) + state_fips = np.zeros(n, dtype=np.int32) for block in np.unique(entity_blocks): if block == "": @@ -225,6 +311,7 @@ def compute_block_takeup_for_entities( sf = int(str(block)[:2]) rate = _resolve_rate(rate_or_dict, sf) rates[blk_mask] = rate + state_fips[blk_mask] = sf if entity_hh_ids is not None: for hh_id in np.unique(entity_hh_ids[blk_mask]): @@ -248,7 +335,13 @@ def compute_block_takeup_for_entities( rng = seeded_rng(var_name, salt=str(block)) draws[blk_mask] = rng.random(int(blk_mask.sum())) - return draws < rates + group_keys = state_fips if isinstance(rate_or_dict, dict) else None + return assign_takeup_with_reported_anchors( + draws, + rates, + reported_mask=reported_mask, + group_keys=group_keys, + ) def apply_block_takeup_to_arrays( @@ -260,6 +353,7 @@ def apply_block_takeup_to_arrays( time_period: int, takeup_filter: List[str] = None, precomputed_rates: Optional[Dict[str, Any]] = None, + reported_anchors: Optional[Dict[str, np.ndarray]] = None, ) -> Dict[str, np.ndarray]: """Compute block-level takeup draws from raw arrays. @@ -290,6 +384,7 @@ def apply_block_takeup_to_arrays( """ filter_set = set(takeup_filter) if takeup_filter is not None else None result = {} + reported_anchors = reported_anchors or {} for spec in SIMPLE_TAKEUP_VARS: var_name = spec["variable"] @@ -310,12 +405,16 @@ def apply_block_takeup_to_arrays( rate_or_dict = precomputed_rates[rate_key] else: rate_or_dict = load_take_up_rate(rate_key, time_period) + reported_mask = reported_anchors.get(var_name) + if reported_mask is not None and len(reported_mask) != n_ent: + raise ValueError(f"reported anchor for {var_name} has wrong length") bools = compute_block_takeup_for_entities( var_name, rate_or_dict, ent_blocks, ent_hh_ids, entity_clone_ids=ent_clone_ids, + reported_mask=reported_mask, ) result[var_name] = bools