Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

logger = logging.getLogger(__name__)

ITEMIZED_DEDUCTION_VARIABLES = {"salt", "real_estate_taxes", "medical_expense_deduction"}

# IRS SOI data is typically available ~2 years after the tax year
IRS_SOI_LAG_YEARS = 2
Expand Down Expand Up @@ -661,7 +662,11 @@ def load_soi_data(long_dfs, year):

# Create child stratum with constraint for this IRS variable
# Note: This stratum will have the constraint that amount_variable > 0
note = f"{geo_description} filers with {amount_variable_name} > 0"
is_itemized = amount_variable_name in ITEMIZED_DEDUCTION_VARIABLES
if is_itemized:
note = f"{geo_description} itemizing filers with {amount_variable_name} > 0"
else:
note = f"{geo_description} filers with {amount_variable_name} > 0"

# Check if child stratum already exists
existing_stratum = (
Expand Down Expand Up @@ -698,6 +703,15 @@ def load_soi_data(long_dfs, year):
]
)

if is_itemized:
child_stratum.constraints_rel.append(
StratumConstraint(
constraint_variable="tax_unit_itemizes",
operation="==",
value="1",
)
)

# Add geographic constraints if applicable
if geo_info["type"] == "state":
child_stratum.constraints_rel.append(
Expand Down
108 changes: 95 additions & 13 deletions policyengine_us_data/db/etl_national_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ def extract_national_targets(dataset: str = DEFAULT_DATASET):

# Separate tax-related targets that need filer constraint
tax_filer_targets = [
{
"variable": "qualified_business_income_deduction",
"value": 63.1e9,
"source": "Joint Committee on Taxation",
"notes": "QBI deduction tax expenditure",
"year": HARDCODED_YEAR,
},
]

# Itemized deduction targets need both filer and itemizer constraints
itemizer_targets = [
{
"variable": "salt_deduction",
"value": 21.247e9,
Expand Down Expand Up @@ -85,13 +96,6 @@ def extract_national_targets(dataset: str = DEFAULT_DATASET):
"notes": "Mortgage interest deduction tax expenditure",
"year": HARDCODED_YEAR,
},
{
"variable": "qualified_business_income_deduction",
"value": 63.1e9,
"source": "Joint Committee on Taxation",
"notes": "QBI deduction tax expenditure",
"year": HARDCODED_YEAR,
},
]

direct_sum_targets = [
Expand Down Expand Up @@ -394,6 +398,7 @@ def extract_national_targets(dataset: str = DEFAULT_DATASET):
return {
"direct_sum_targets": direct_sum_targets,
"tax_filer_targets": tax_filer_targets,
"itemizer_targets": itemizer_targets,
"conditional_count_targets": conditional_count_targets,
"cbo_targets": cbo_targets,
"treasury_targets": treasury_targets,
Expand All @@ -413,9 +418,10 @@ def transform_national_targets(raw_targets):
Returns
-------
tuple
(direct_targets_df, tax_filer_df, conditional_targets)
(direct_targets_df, tax_filer_df, itemizer_df, conditional_targets)
- direct_targets_df: DataFrame with direct sum targets
- tax_filer_df: DataFrame with tax-related targets needing filer constraint
- itemizer_df: DataFrame with itemized deduction targets needing filer + itemizer constraints
- conditional_targets: List of conditional count targets
"""

Expand Down Expand Up @@ -444,14 +450,19 @@ def transform_national_targets(raw_targets):
tax_filer_df = (
pd.DataFrame(all_tax_filer_targets) if all_tax_filer_targets else pd.DataFrame()
)
itemizer_df = (
pd.DataFrame(raw_targets["itemizer_targets"])
if raw_targets["itemizer_targets"]
else pd.DataFrame()
)

# Conditional targets stay as list for special processing
conditional_targets = raw_targets["conditional_count_targets"]

return direct_df, tax_filer_df, conditional_targets
return direct_df, tax_filer_df, itemizer_df, conditional_targets


def load_national_targets(direct_targets_df, tax_filer_df, conditional_targets):
def load_national_targets(direct_targets_df, tax_filer_df, itemizer_df, conditional_targets):
"""
Load national targets into the database.

Expand All @@ -461,6 +472,8 @@ def load_national_targets(direct_targets_df, tax_filer_df, conditional_targets):
DataFrame with direct sum target data
tax_filer_df : pd.DataFrame
DataFrame with tax-related targets needing filer constraint
itemizer_df : pd.DataFrame
DataFrame with itemized deduction targets needing filer + itemizer constraints
conditional_targets : list
List of conditional count targets requiring strata
"""
Expand Down Expand Up @@ -590,6 +603,74 @@ def load_national_targets(direct_targets_df, tax_filer_df, conditional_targets):
session.add(target)
print(f"Added filer target: {target_data['variable']}")

# Process itemized deduction targets that need filer + itemizer constraints
if not itemizer_df.empty:
national_itemizer_stratum = (
session.query(Stratum)
.filter(
Stratum.parent_stratum_id == us_stratum.stratum_id,
Stratum.notes == "United States - Itemizing Tax Filers",
)
.first()
)

if not national_itemizer_stratum:
national_itemizer_stratum = Stratum(
parent_stratum_id=us_stratum.stratum_id,
notes="United States - Itemizing Tax Filers",
)
national_itemizer_stratum.constraints_rel = [
StratumConstraint(
constraint_variable="tax_unit_is_filer",
operation="==",
value="1",
),
StratumConstraint(
constraint_variable="tax_unit_itemizes",
operation="==",
value="1",
),
]
session.add(national_itemizer_stratum)
session.flush()
print("Created national itemizer stratum")

for _, target_data in itemizer_df.iterrows():
target_year = target_data["year"]
existing_target = (
session.query(Target)
.filter(
Target.stratum_id == national_itemizer_stratum.stratum_id,
Target.variable == target_data["variable"],
Target.period == target_year,
)
.first()
)

notes_parts = []
if pd.notna(target_data.get("notes")):
notes_parts.append(target_data["notes"])
notes_parts.append(f"Source: {target_data.get('source', 'Unknown')}")
combined_notes = " | ".join(notes_parts)

if existing_target:
existing_target.value = target_data["value"]
existing_target.notes = combined_notes
existing_target.source = "PolicyEngine"
print(f"Updated itemizer target: {target_data['variable']}")
else:
target = Target(
stratum_id=national_itemizer_stratum.stratum_id,
variable=target_data["variable"],
period=target_year,
value=target_data["value"],
active=True,
source="PolicyEngine",
notes=combined_notes,
)
session.add(target)
print(f"Added itemizer target: {target_data['variable']}")

# Process conditional count targets (enrollment counts)
for cond_target in conditional_targets:
constraint_var = cond_target["constraint_variable"]
Expand Down Expand Up @@ -686,11 +767,12 @@ def load_national_targets(direct_targets_df, tax_filer_df, conditional_targets):
session.commit()

total_targets = (
len(direct_targets_df) + len(tax_filer_df) + len(conditional_targets)
len(direct_targets_df) + len(tax_filer_df) + len(itemizer_df) + len(conditional_targets)
)
print(f"\nSuccessfully loaded {total_targets} national targets")
print(f" - {len(direct_targets_df)} direct sum targets")
print(f" - {len(tax_filer_df)} tax filer targets")
print(f" - {len(itemizer_df)} itemizer targets")
print(f" - {len(conditional_targets)} enrollment count targets (as strata)")


Expand All @@ -706,13 +788,13 @@ def main():

# Transform
print("Transforming targets...")
direct_targets_df, tax_filer_df, conditional_targets = transform_national_targets(
direct_targets_df, tax_filer_df, itemizer_df, conditional_targets = transform_national_targets(
raw_targets
)

# Load
print("Loading targets into database...")
load_national_targets(direct_targets_df, tax_filer_df, conditional_targets)
load_national_targets(direct_targets_df, tax_filer_df, itemizer_df, conditional_targets)

print("\nETL pipeline complete!")

Expand Down
Loading