From 22059145ac411537d0d256e4a7f1cda113b77cef Mon Sep 17 00:00:00 2001 From: igerber Date: Sat, 4 Apr 2026 10:16:38 -0400 Subject: [PATCH 1/6] Add survey design support to EfficientDiD covariates (DR) path Thread survey weights through all five nuisance estimation steps of the doubly-robust covariates path: WLS outcome regression, weighted sieve normal equations for propensity ratios/inverse propensities, survey-weighted Nadaraya-Watson kernel for conditional Omega*(X), and survey-weighted ATT averaging. Remove the NotImplementedError guard that blocked covariates + survey_design combinations. Co-Authored-By: Claude Opus 4.6 (1M context) --- TODO.md | 1 + diff_diff/efficient_did.py | 25 ++- diff_diff/efficient_did_covariates.py | 111 +++++++++++--- docs/methodology/REGISTRY.md | 2 +- docs/survey-roadmap.md | 2 +- tests/test_survey_phase3.py | 209 ++++++++++++++++++++++++-- 6 files changed, 303 insertions(+), 47 deletions(-) diff --git a/TODO.md b/TODO.md index 748a2dd8..9ea2f232 100644 --- a/TODO.md +++ b/TODO.md @@ -69,6 +69,7 @@ Deferred items from PR reviews that were not addressed before merge. | TripleDifference power: `generate_ddd_data` is a fixed 2×2×2 cross-sectional DGP — no multi-period or unbalanced-group support. Add a `generate_ddd_panel_data` for panel DDD power analysis. | `prep_dgp.py`, `power.py` | #208 | Low | | ContinuousDiD event-study aggregation anticipation filter — **Resolved**. `_aggregate_event_study()` now filters `e < -anticipation` when `anticipation > 0`, matching CallawaySantAnna behavior. Bootstrap paths also filtered. | `continuous_did.py` | #226 | Resolved | | Survey design resolution/collapse patterns are inconsistent across panel estimators — ContinuousDiD rebuilds unit-level design in SE code, EfficientDiD builds once in fit(), StackedDiD re-resolves on stacked data; extract shared helpers for panel-to-unit collapse, post-filter re-resolution, and metadata recomputation | `continuous_did.py`, `efficient_did.py`, `stacked_did.py` | #226 | Low | +| Survey-weighted Silverman bandwidth in EfficientDiD conditional Omega* — `_silverman_bandwidth()` uses unweighted mean/std for bandwidth selection; survey-weighted statistics would better reflect the population distribution but is a second-order refinement | `efficient_did_covariates.py` | — | Low | | Survey metadata formatting dedup — **Resolved**. Extracted `_format_survey_block()` helper in `results.py`, replaced 13 occurrences across 11 files. | `results.py` + 10 results files | — | Resolved | | TROP: `fit()` and `_fit_global()` share ~150 lines of near-identical data setup (panel pivoting, absorbing-state validation, first-treatment detection, effective rank, NaN warnings). Both bootstrap methods also duplicate the stratified resampling loop. Extract shared helpers to eliminate cross-file sync risk. | `trop.py`, `trop_global.py`, `trop_local.py` | — | Low | | StaggeredTripleDifference R cross-validation: CSV fixtures not committed (gitignored); tests skip without local R + triplediff. Commit fixtures or generate deterministically. | `tests/test_methodology_staggered_triple_diff.py` | #245 | Medium | diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index d25f0512..3e4d419b 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -347,8 +347,6 @@ def fit( ValueError Missing columns, unbalanced panel, non-absorbing treatment, or PT-Post without a never-treated group. - NotImplementedError - If ``covariates`` and ``survey_design`` are both set. """ self._validate_params() @@ -381,16 +379,6 @@ def fit( # Bootstrap + survey supported via PSU-level multiplier bootstrap. - # Guard covariates + survey (DR path does not yet thread survey weights) - if covariates is not None and len(covariates) > 0 and resolved_survey is not None: - raise NotImplementedError( - "Survey weights with covariates are not yet supported for " - "EfficientDiD. The doubly robust covariate path does not " - "thread survey weights through nuisance estimation. " - "Use covariates=None with survey_design, or drop survey_design " - "when using covariates." - ) - # Normalize empty covariates list to None (use nocov path) if covariates is not None and len(covariates) == 0: covariates = None @@ -742,6 +730,7 @@ def fit( never_treated_mask, t_col_val, tpre_col_val, + unit_weights=unit_level_weights, ) # m_{g', tpre, 1}(X) key_gp_tpre = (gp, tpre_col_val, effective_p1_col) @@ -755,6 +744,7 @@ def fit( gp_mask_for_reg, tpre_col_val, effective_p1_col, + unit_weights=unit_level_weights, ) # r_{g, inf}(X) and r_{g, g'}(X) via sieve (Eq 4.1-4.2) for comp in {np.inf, gp}: @@ -770,6 +760,7 @@ def fit( k_max=self.sieve_k_max, criterion=self.sieve_criterion, ratio_clip=self.ratio_clip, + unit_weights=unit_level_weights, ) # Per-unit DR generated outcomes: shape (n_units, H) @@ -801,6 +792,7 @@ def fit( group_mask_s, k_max=self.sieve_k_max, criterion=self.sieve_criterion, + unit_weights=unit_level_weights, ) # Conditional Omega*(X) with per-unit propensities (Eq 3.12) @@ -817,14 +809,19 @@ def fit( covariate_matrix=covariate_matrix, s_hat_cache=s_hat_cache, bandwidth=self.kernel_bandwidth, + unit_weights=unit_level_weights, ) # Per-unit weights: (n_units, H) per_unit_w = compute_per_unit_weights(omega_cond) - # ATT = mean_i( w(X_i) @ gen_out[i] ) + # ATT = (survey-)weighted mean of per-unit DR scores if per_unit_w.shape[1] > 0: - att_gt = float(np.mean(np.sum(per_unit_w * gen_out, axis=1))) + per_unit_scores = np.sum(per_unit_w * gen_out, axis=1) + if unit_level_weights is not None: + att_gt = float(np.average(per_unit_scores, weights=unit_level_weights)) + else: + att_gt = float(np.mean(per_unit_scores)) else: att_gt = np.nan diff --git a/diff_diff/efficient_did_covariates.py b/diff_diff/efficient_did_covariates.py index 9e3052c2..3f76e638 100644 --- a/diff_diff/efficient_did_covariates.py +++ b/diff_diff/efficient_did_covariates.py @@ -37,6 +37,7 @@ def estimate_outcome_regression( group_mask: np.ndarray, t_col: int, tpre_col: int, + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: """Estimate conditional mean outcome change m_hat(X) for a comparison group. @@ -56,6 +57,9 @@ def estimate_outcome_regression( Mask selecting units in the comparison group. t_col, tpre_col : int Column indices in ``outcome_wide`` for the two time periods. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses WLS + instead of OLS for the within-group regression. Returns ------- @@ -68,9 +72,13 @@ def estimate_outcome_regression( X_group = covariate_matrix[group_mask] X_design = np.column_stack([np.ones(len(X_group)), X_group]) + w_group = unit_weights[group_mask] if unit_weights is not None else None + coef, _, _ = solve_ols( X_design, delta_y, + weights=w_group, + weight_type="pweight" if w_group is not None else None, return_vcov=False, rank_deficient_action="warn", ) @@ -121,7 +129,9 @@ def _polynomial_sieve_basis(X: np.ndarray, degree: int) -> np.ndarray: """ n, d = X.shape - # Standardize for numerical stability + # Standardize for numerical stability (unweighted mean/std intentional — + # this is only for conditioning, not for the statistical estimand; with + # survey weights the sieve basis is the same, only the objective changes) X_mean = X.mean(axis=0) X_std = X.std(axis=0) X_std[X_std < 1e-10] = 1.0 # avoid division by zero for constant columns @@ -146,6 +156,7 @@ def estimate_propensity_ratio_sieve( k_max: Optional[int] = None, criterion: str = "bic", ratio_clip: float = 20.0, + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: r"""Estimate propensity ratio via sieve convex minimization (Eq 4.1-4.2). @@ -176,6 +187,9 @@ def estimate_propensity_ratio_sieve( ``"aic"`` or ``"bic"``. ratio_clip : float Clip ratios to ``[1/ratio_clip, ratio_clip]``. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses weighted + normal equations for the sieve estimation. Returns ------- @@ -197,9 +211,20 @@ def estimate_propensity_ratio_sieve( k_max = max(k_max, 1) # Penalty multiplier for IC + # BIC penalty uses observation count (not weighted) — complexity vs distinct obs n_total = int(np.sum(mask_g)) + n_gp c_n = 2.0 if criterion == "aic" else np.log(max(n_total, 2)) + # Weighted totals for loss normalization (raw probability weights) + if unit_weights is not None: + w_g = unit_weights[mask_g] + w_gp = unit_weights[mask_gp] + n_total_w = float(np.sum(w_g)) + float(np.sum(w_gp)) + else: + w_g = None + w_gp = None + n_total_w = float(n_total) + best_ic = np.inf best_ratio = np.ones(n_units) # fallback: constant ratio 1 @@ -214,9 +239,15 @@ def estimate_propensity_ratio_sieve( Psi_gp = basis_all[mask_gp] # (n_gp, n_basis) Psi_g = basis_all[mask_g] # (n_g, n_basis) - # Normal equations: (Psi_gp' Psi_gp) beta = Psi_g.sum(axis=0) - A = Psi_gp.T @ Psi_gp - b = Psi_g.sum(axis=0) + # Normal equations (weighted when survey weights present): + # Unweighted: (Psi_gp' Psi_gp) beta = Psi_g.sum(axis=0) + # Weighted: (Psi_gp' W_gp Psi_gp) beta = (w_g * Psi_g).sum(axis=0) + if w_gp is not None: + A = Psi_gp.T @ (w_gp[:, None] * Psi_gp) + b = (w_g[:, None] * Psi_g).sum(axis=0) + else: + A = Psi_gp.T @ Psi_gp + b = Psi_g.sum(axis=0) try: beta = np.linalg.solve(A, b) @@ -230,11 +261,12 @@ def estimate_propensity_ratio_sieve( # Predicted ratio for all units r_hat = basis_all @ beta - # IC selection: loss at optimum = -(1/n) * b'beta - # Derivation: L(beta) = (1/n)(beta'A*beta - 2*b'beta). + # IC selection: loss at optimum = -(1/n_w) * b'beta + # Derivation: L(beta) = (1/n_w)(beta'A*beta - 2*b'beta). # At optimum A*beta = b, so beta'A*beta = b'beta. - # Therefore L = (1/n)(b'beta - 2*b'beta) = -(1/n)*b'beta. - loss = -float(b @ beta) / n_total + # Therefore L = (1/n_w)(b'beta - 2*b'beta) = -(1/n_w)*b'beta. + # Loss uses weighted totals; BIC penalty uses observation count. + loss = -float(b @ beta) / n_total_w ic_val = 2.0 * loss + c_n * n_basis / n_total if ic_val < best_ic: @@ -280,6 +312,7 @@ def estimate_inverse_propensity_sieve( group_mask: np.ndarray, k_max: Optional[int] = None, criterion: str = "bic", + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: r"""Estimate s_{g'}(X) = 1/p_{g'}(X) via sieve convex minimization. @@ -305,6 +338,9 @@ def estimate_inverse_propensity_sieve( Maximum polynomial degree. None = auto. criterion : str ``"aic"`` or ``"bic"``. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses weighted + normal equations for the sieve estimation. Returns ------- @@ -322,10 +358,21 @@ def estimate_inverse_propensity_sieve( k_max = min(int(n_group**0.2), 5) k_max = max(k_max, 1) + # BIC penalty uses observation count (not weighted) c_n = 2.0 if criterion == "aic" else np.log(max(n_units, 2)) + # Weighted loss normalization and fallback + if unit_weights is not None: + w_group = unit_weights[group_mask] + n_units_w = float(np.sum(unit_weights)) + fallback_ratio = n_units_w / float(np.sum(w_group)) + else: + w_group = None + n_units_w = float(n_units) + fallback_ratio = n_units / n_group + best_ic = np.inf - best_s = np.full(n_units, n_units / n_group) # fallback: unconditional + best_s = np.full(n_units, fallback_ratio) # fallback: unconditional for K in range(1, k_max + 1): n_basis = comb(K + d, d) @@ -335,9 +382,16 @@ def estimate_inverse_propensity_sieve( basis_all = _polynomial_sieve_basis(covariate_matrix, K) Psi_gp = basis_all[group_mask] - A = Psi_gp.T @ Psi_gp - # RHS: sum of basis over ALL units (not just one group) - b = basis_all.sum(axis=0) + # Normal equations (weighted when survey weights present): + # Unweighted: (Psi_gp' Psi_gp) beta = Psi_all.sum(axis=0) + # Weighted: (Psi_gp' W_group Psi_gp) beta = (w_all * Psi_all).sum(axis=0) + if w_group is not None: + A = Psi_gp.T @ (w_group[:, None] * Psi_gp) + b = (unit_weights[:, None] * basis_all).sum(axis=0) + else: + A = Psi_gp.T @ Psi_gp + # RHS: sum of basis over ALL units (not just one group) + b = basis_all.sum(axis=0) try: beta = np.linalg.solve(A, b) @@ -348,8 +402,9 @@ def estimate_inverse_propensity_sieve( s_hat = basis_all @ beta - # IC: loss = -(1/n) * b'beta (same derivation as ratio estimator) - loss = -float(b @ beta) / n_units + # IC: loss = -(1/n_w) * b'beta (same derivation as ratio estimator) + # Loss uses weighted totals; BIC penalty uses observation count. + loss = -float(b @ beta) / n_units_w ic_val = 2.0 * loss + c_n * n_basis / n_units if ic_val < best_ic: @@ -496,6 +551,7 @@ def _kernel_weights_matrix( X_all: np.ndarray, X_group: np.ndarray, bandwidth: float, + group_weights: Optional[np.ndarray] = None, ) -> np.ndarray: """Gaussian kernel weight matrix. @@ -503,11 +559,21 @@ def _kernel_weights_matrix( normalized kernel weight ``K_h(X_group[j], X_all[i])``. Each row sums to 1 (Nadaraya-Watson normalization). + + Parameters + ---------- + group_weights : ndarray, shape (n_group,), optional + Survey weights for the group units. When provided, kernel + weights are multiplied by survey weights before row-normalization, + making the Nadaraya-Watson estimator survey-weighted. """ # Squared distances: (n_all, n_group) dist_sq = cdist(X_all, X_group, metric="sqeuclidean") # Gaussian kernel raw = np.exp(-dist_sq / (2.0 * bandwidth**2)) + # Survey-weight: each group unit j contributes ∝ w_j * K_h(X_i, X_j) + if group_weights is not None: + raw = raw * group_weights[np.newaxis, :] # Normalize each row row_sums = raw.sum(axis=1, keepdims=True) row_sums[row_sums < 1e-15] = 1.0 # avoid division by zero @@ -559,6 +625,7 @@ def compute_omega_star_conditional( covariate_matrix: np.ndarray, s_hat_cache: Dict[float, np.ndarray], bandwidth: Optional[float] = None, + unit_weights: Optional[np.ndarray] = None, never_treated_val: float = np.inf, ) -> np.ndarray: r"""Kernel-smoothed conditional Omega\*(X_i) for each unit (Eq 3.12). @@ -583,6 +650,9 @@ def compute_omega_star_conditional( value is shape ``(n_units,)``. Keyed by group identifier. bandwidth : float or None Kernel bandwidth. None = Silverman's rule. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, kernel-smoothed + covariances use survey-weighted Nadaraya-Watson regression. never_treated_val : float Returns @@ -622,13 +692,17 @@ def compute_omega_star_conditional( stacklevel=2, ) + # Per-group survey weights for kernel smoothing + w_g = unit_weights[g_mask] if unit_weights is not None else None + w_inf = unit_weights[never_treated_mask] if unit_weights is not None else None + # Pre-compute kernel weight matrices per group Y_g = outcome_wide[g_mask] X_g = covariate_matrix[g_mask] Yg_t_minus_1 = Y_g[:, t_col] - Y_g[:, y1_col] - W_g = _kernel_weights_matrix(covariate_matrix, X_g, bandwidth) - W_inf = _kernel_weights_matrix(covariate_matrix, X_inf, bandwidth) + W_g = _kernel_weights_matrix(covariate_matrix, X_g, bandwidth, group_weights=w_g) + W_inf = _kernel_weights_matrix(covariate_matrix, X_inf, bandwidth, group_weights=w_inf) inf_t_minus_tpre = {} for _, tpre in valid_pairs: @@ -683,7 +757,10 @@ def compute_omega_star_conditional( ) if gp_j not in W_gp_cache: X_gp = covariate_matrix[cohort_masks[gp_j]] - W_gp_cache[gp_j] = _kernel_weights_matrix(covariate_matrix, X_gp, bandwidth) + w_gp_j = unit_weights[cohort_masks[gp_j]] if unit_weights is not None else None + W_gp_cache[gp_j] = _kernel_weights_matrix( + covariate_matrix, X_gp, bandwidth, group_weights=w_gp_j + ) gp_outcomes_cache[gp_j] = outcome_wide[cohort_masks[gp_j]] W_gp = W_gp_cache[gp_j] Y_gp = gp_outcomes_cache[gp_j] diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 77d85feb..ff669d44 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -703,7 +703,7 @@ where `q_{g,e} = pi_g / sum_{g' in G_{trt,e}} pi_{g'}`. - **Note:** Conditional covariance Omega*(X) scales each term by per-unit sieve-estimated inverse propensities s_hat_{g'}(X) = 1/p_{g'}(X) (algorithm step 4), matching Eq 3.12. The inverse propensity estimation uses the same polynomial sieve convex minimization as the ratio estimator. Estimated s_hat values are clipped to [1, n] with a UserWarning when clipping binds, mirroring the ratio path's overlap diagnostics. - **Note:** Outcome regressions m_hat_{g',t,tpre}(X) use linear OLS working models. The paper's Section 4 describes flexible nonparametric nuisance estimation (sieve regression, kernel smoothing, or ML methods). The DR property ensures consistency if either the OLS outcome model or the sieve propensity ratio is correctly specified, but the linear OLS specification does not generically guarantee attainment of the semiparametric efficiency bound unless the conditional mean is linear in the covariates. - **Note:** EfficientDiD bootstrap with survey weights supported (Phase 6) via PSU-level multiplier weights -- **Note:** EfficientDiD covariates (DR path) with survey weights deferred — the doubly robust nuisance estimation does not yet thread survey weights through sieve/kernel steps +- **Note:** EfficientDiD covariates (DR path) with survey weights supported — WLS outcome regression, weighted sieve normal equations for propensity ratios/inverse propensities, survey-weighted Nadaraya-Watson kernel for conditional Omega*(X), and survey-weighted ATT averaging. Silverman bandwidth uses unweighted statistics (survey-weighted bandwidth deferred as second-order refinement). - **Note:** Cluster-robust SEs use the standard Liang-Zeger clustered sandwich estimator applied to EIF values: aggregate EIF within clusters, center, and compute variance with G/(G-1) small-sample correction. Cluster bootstrap generates multiplier weights at the cluster level (all units in a cluster share the same weight). Analytical clustered SEs are the default when `cluster` is set; cluster bootstrap is opt-in via `n_bootstrap > 0`. - **Note:** Hausman pretest operates on the post-treatment event-study vector ES(e) per Theorem A.1. Both PT-All and PT-Post fits are aggregated to ES(e) using cohort-size weights before computing the test statistic H = delta' V^{-1} delta where delta = ES_post - ES_all and V = Cov(ES_post) - Cov(ES_all). Covariance is computed from aggregated ES(e)-level EIF values. The variance-difference matrix V is inverted via Moore-Penrose pseudoinverse to handle finite-sample non-positive-definiteness. Effective rank of V (number of positive eigenvalues) is used as degrees of freedom. - **Note:** Last-cohort-as-control (`control_group="last_cohort"`) reclassifies the latest treatment cohort as pseudo-never-treated and drops time periods at/after that cohort's treatment start. This is distinct from CallawaySantAnna's `not_yet_treated` option which dynamically selects not-yet-treated units per (g,t) pair. diff --git a/docs/survey-roadmap.md b/docs/survey-roadmap.md index 56311734..274f9c6e 100644 --- a/docs/survey-roadmap.md +++ b/docs/survey-roadmap.md @@ -36,7 +36,7 @@ Phase 5 infrastructure (bootstrap+survey interaction): | SunAbraham | Pairs bootstrap + survey | **Resolved** (Phase 6, Rao-Wu rescaled) | | ContinuousDiD | Multiplier bootstrap + survey | **Resolved** (Phase 6, multiplier at PSU) | | EfficientDiD | Multiplier bootstrap + survey | **Resolved** (Phase 6, multiplier at PSU) | -| EfficientDiD | Covariates (DR path) + survey | DR nuisance estimation needs survey weight threading | +| EfficientDiD | Covariates (DR path) + survey | **Resolved**: survey weights threaded through DR nuisance estimation | All blocked combinations raise `NotImplementedError` when attempted, with a message pointing to the planned phase or describing the limitation. diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index f43c4968..2148ebd2 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -745,23 +745,29 @@ def test_bootstrap_survey_supported(self, staggered_survey_data): assert np.isfinite(result.overall_att) assert np.isfinite(result.overall_se) - def test_covariates_survey_raises(self, staggered_survey_data): - """Covariates + survey should raise NotImplementedError.""" + def test_covariates_survey_works(self, staggered_survey_data): + """Covariates + survey should produce finite results via DR path.""" from diff_diff import EfficientDiD - # Add a covariate column - staggered_survey_data["x1"] = np.random.randn(len(staggered_survey_data)) + # Add a time-invariant covariate + np.random.seed(123) + unit_vals = {u: np.random.randn() for u in staggered_survey_data["unit"].unique()} + staggered_survey_data["x1"] = staggered_survey_data["unit"].map(unit_vals) sd = SurveyDesign(weights="weight") - with pytest.raises(NotImplementedError, match="covariates"): - EfficientDiD(n_bootstrap=0).fit( - staggered_survey_data, - "outcome", - "unit", - "time", - "first_treat", - covariates=["x1"], - survey_design=sd, - ) + result = EfficientDiD(n_bootstrap=0).fit( + staggered_survey_data, + "outcome", + "unit", + "time", + "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + assert result.estimation_path == "dr" + assert result.survey_metadata is not None def test_survey_metadata_fields(self, staggered_survey_data): """survey_metadata has correct fields.""" @@ -864,6 +870,181 @@ def test_survey_all_aggregation(self, staggered_survey_data): assert np.isfinite(result.overall_se) +# ============================================================================= +# EfficientDiD Covariates + Survey +# ============================================================================= + + +class TestEfficientDiDCovSurvey: + """Survey design support for EfficientDiD covariates (DR) path.""" + + @pytest.fixture + def cov_survey_data(self): + """Staggered panel with time-invariant covariates and survey columns.""" + np.random.seed(42) + n_units = 60 + n_periods = 8 + rows = [] + # Assign a time-invariant covariate per unit + unit_x1 = {u: np.random.randn() for u in range(n_units)} + for unit in range(n_units): + if unit < 20: + ft = 4 + elif unit < 40: + ft = 6 + else: + ft = 0 + + stratum = unit // 12 + psu = unit // 5 + fpc_val = 120.0 + wt = 1.0 + 0.3 * stratum + + for t in range(1, n_periods + 1): + y = 10.0 + unit * 0.05 + t * 0.2 + 0.5 * unit_x1[unit] + if ft > 0 and t >= ft: + y += 2.0 + y += np.random.normal(0, 0.5) + + rows.append( + { + "unit": unit, + "time": t, + "first_treat": ft, + "outcome": y, + "weight": wt, + "stratum": stratum, + "psu": psu, + "fpc": fpc_val, + "x1": unit_x1[unit], + } + ) + return pd.DataFrame(rows) + + def test_uniform_weight_equivalence(self, cov_survey_data): + """Covariates + uniform survey weights ≈ covariates without survey.""" + from diff_diff import EfficientDiD + + # Set all weights to 1.0 + cov_survey_data["weight"] = 1.0 + sd = SurveyDesign(weights="weight") + + result_survey = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_nosurv = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + ) + assert result_survey.estimation_path == "dr" + assert result_nosurv.estimation_path == "dr" + np.testing.assert_allclose( + result_survey.overall_att, result_nosurv.overall_att, atol=1e-8 + ) + + def test_scale_invariance(self, cov_survey_data): + """Multiplying all weights by a constant doesn't change ATT.""" + from diff_diff import EfficientDiD + + sd1 = SurveyDesign(weights="weight") + result1 = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd1, + ) + + cov_survey_data["weight_scaled"] = cov_survey_data["weight"] * 5.0 + sd2 = SurveyDesign(weights="weight_scaled") + result2 = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd2, + ) + np.testing.assert_allclose( + result1.overall_att, result2.overall_att, atol=1e-8 + ) + + def test_nontrivial_weight_effect(self, cov_survey_data): + """Heterogeneous weights produce different ATT from unweighted.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result_survey = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_nosurv = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + ) + # Non-uniform weights (1.0 + 0.3*stratum) should produce different ATT + assert abs(result_survey.overall_att - result_nosurv.overall_att) > 1e-6 + + def test_full_design_smoke(self, cov_survey_data): + """Strata + PSU + FPC + covariates produces finite results.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign( + weights="weight", strata="stratum", psu="psu", fpc="fpc", + nest=True, + ) + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + assert result.estimation_path == "dr" + + def test_aggregation_with_survey(self, cov_survey_data): + """Event study and group aggregation work with covariates + survey.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + aggregate="all", + survey_design=sd, + ) + assert result.event_study_effects is not None + assert result.group_effects is not None + for _, eff in result.event_study_effects.items(): + assert np.isfinite(eff["effect"]) + assert np.isfinite(eff["se"]) + for _, eff in result.group_effects.items(): + assert np.isfinite(eff["effect"]) + assert np.isfinite(eff["se"]) + + def test_bootstrap_covariates_survey(self, cov_survey_data): + """Bootstrap + covariates + survey produces finite results.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result = EfficientDiD(n_bootstrap=30, seed=42).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + + # ============================================================================= # Scale Invariance (applies to all estimators) # ============================================================================= From 0a12a77e5835bac4c00645e076b4e5ddb80a75ec Mon Sep 17 00:00:00 2001 From: igerber Date: Sat, 4 Apr 2026 10:55:05 -0400 Subject: [PATCH 2/6] Address P1/P2 review findings: survey-weighted WIF, bootstrap denominator, zero-weight guard - Make _compute_wif_contribution() survey-aware: use w_i * 1{G_i=g} - pg_k formula when unit_weights present, matching staggered_aggregation.py - Use explicit sum(unit_level_weights) denominator in bootstrap perturbation when survey design is active - Guard zero-weight cohorts: skip in fit loop, early return in compute_generated_outcomes_cov when pi_g <= 0 - Add regression tests: analytical SE differs from unweighted, bootstrap SE in ballpark of analytical, zero-weight cohort handled gracefully - Update tutorial notebook: remove stale note about covariates+survey Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/efficient_did.py | 34 +++++++++-- diff_diff/efficient_did_bootstrap.py | 11 +++- diff_diff/efficient_did_covariates.py | 4 ++ docs/tutorials/16_survey_did.ipynb | 86 +++++++++++---------------- tests/test_survey_phase3.py | 60 +++++++++++++++++++ 5 files changed, 140 insertions(+), 55 deletions(-) diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index 3e4d419b..1cb529b4 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -571,6 +571,7 @@ def fit( # Use the resolved survey's weights (already normalized per weight_type) # subset to unit level via _unit_first_panel_row (aligned to all_units) unit_level_weights = self._unit_resolved_survey.weights + self._unit_level_weights = unit_level_weights cohort_fractions: Dict[float, float] = {} if unit_level_weights is not None: @@ -674,6 +675,15 @@ def fit( else: effective_p1_col = period_1_col + # Guard: skip cohorts with zero survey weight (all units zero-weighted) + if cohort_fractions[g] <= 0: + warnings.warn( + f"Cohort {g} has zero survey weight; skipping.", + UserWarning, + stacklevel=2, + ) + continue + # Estimate all (g, t) cells including pre-treatment. Under PT-Post, # pre-treatment cells serve as placebo/pre-trend diagnostics, matching # the CallawaySantAnna implementation. Users filter to t >= g for @@ -976,6 +986,7 @@ def fit( cluster_indices=unit_cluster_indices, n_clusters=n_clusters, resolved_survey=self._unit_resolved_survey, + unit_level_weights=self._unit_level_weights, ) # Update estimates with bootstrap inference overall_se = bootstrap_results.overall_att_se @@ -1137,6 +1148,7 @@ def _compute_wif_contribution( unit_cohorts: np.ndarray, cohort_fractions: Dict[float, float], n_units: int, + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: """Compute weight influence function correction (O(1) scale, matching EIF). @@ -1156,6 +1168,9 @@ def _compute_wif_contribution( ``{cohort: n_cohort / n}`` for each cohort. n_units : int Total number of units. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses the + survey-weighted WIF formula: IF_i(p_g) = (w_i * 1{G_i=g} - pg_k). Returns ------- @@ -1169,10 +1184,19 @@ def _compute_wif_contribution( return np.zeros(n_units) indicator = (unit_cohorts[:, None] == groups_for_keepers[None, :]).astype(float) - indicator_sum = np.sum(indicator - pg_keepers, axis=1) + + if unit_weights is not None: + # Survey-weighted WIF (matches staggered_aggregation.py:392-401): + # IF_i(p_g) = (w_i * 1{G_i=g} - pg_k), NOT (1{G_i=g} - pg_k) + weighted_indicator = indicator * unit_weights[:, None] + indicator_diff = weighted_indicator - pg_keepers + indicator_sum = np.sum(indicator_diff, axis=1) + else: + indicator_diff = indicator - pg_keepers + indicator_sum = np.sum(indicator_diff, axis=1) with np.errstate(divide="ignore", invalid="ignore", over="ignore"): - if1 = (indicator - pg_keepers) / sum_pg + if1 = indicator_diff / sum_pg if2 = np.outer(indicator_sum, pg_keepers) / sum_pg**2 wif_matrix = if1 - if2 wif_contrib = wif_matrix @ effects @@ -1229,7 +1253,8 @@ def _aggregate_overall( # WIF correction: accounts for uncertainty in cohort-size weights wif = self._compute_wif_contribution( - keepers, effects, unit_cohorts, cohort_fractions, n_units + keepers, effects, unit_cohorts, cohort_fractions, n_units, + unit_weights=self._unit_level_weights, ) agg_eif_total = agg_eif + wif # both O(1) scale @@ -1325,7 +1350,8 @@ def _aggregate_event_study( es_keepers = [(g, t) for (g, t) in gt_pairs] es_effects = effs wif = self._compute_wif_contribution( - es_keepers, es_effects, unit_cohorts, cohort_fractions, n_units + es_keepers, es_effects, unit_cohorts, cohort_fractions, n_units, + unit_weights=self._unit_level_weights, ) agg_eif = agg_eif + wif diff --git a/diff_diff/efficient_did_bootstrap.py b/diff_diff/efficient_did_bootstrap.py index cbeb8161..03ba4077 100644 --- a/diff_diff/efficient_did_bootstrap.py +++ b/diff_diff/efficient_did_bootstrap.py @@ -63,6 +63,7 @@ def _run_multiplier_bootstrap( cluster_indices: Optional[np.ndarray] = None, n_clusters: Optional[int] = None, resolved_survey: object = None, + unit_level_weights: Optional[np.ndarray] = None, ) -> EDiDBootstrapResults: """Run multiplier bootstrap on stored EIF values. @@ -136,11 +137,19 @@ def _run_multiplier_bootstrap( original_atts = np.array([group_time_effects[gt]["effect"] for gt in gt_pairs]) # Perturbed ATTs: (n_bootstrap, n_gt) + # Under survey design, normalize by sum(survey_weights) instead of n_units + # (pweights are normalized to mean=1, so numerically equivalent, but explicit + # for robustness against future weight types) + denom = ( + float(np.sum(unit_level_weights)) + if unit_level_weights is not None + else float(n_units) + ) bootstrap_atts = np.zeros((self.n_bootstrap, n_gt)) for j, gt in enumerate(gt_pairs): eif_gt = eif_by_gt[gt] # shape (n_units,) with np.errstate(divide="ignore", invalid="ignore", over="ignore"): - perturbation = (all_weights @ eif_gt) / n_units + perturbation = (all_weights @ eif_gt) / denom bootstrap_atts[:, j] = original_atts[j] + perturbation # Post-treatment mask — also exclude NaN effects diff --git a/diff_diff/efficient_did_covariates.py b/diff_diff/efficient_did_covariates.py index 3f76e638..167985de 100644 --- a/diff_diff/efficient_did_covariates.py +++ b/diff_diff/efficient_did_covariates.py @@ -488,6 +488,10 @@ def compute_generated_outcomes_cov( g_mask = cohort_masks[target_g] pi_g = cohort_fractions[target_g] + # Guard: zero survey weight for the target cohort → no DR estimation possible + if pi_g <= 0: + return np.zeros((n_units, H)) + gen_out = np.zeros((n_units, H)) for j, (gp, tpre) in enumerate(valid_pairs): diff --git a/docs/tutorials/16_survey_did.ipynb b/docs/tutorials/16_survey_did.ipynb index aa1ce756..c8d056df 100644 --- a/docs/tutorials/16_survey_did.ipynb +++ b/docs/tutorials/16_survey_did.ipynb @@ -95,7 +95,7 @@ "**About the normalization warning:** You'll see `pweight weights normalized to mean=1` throughout this tutorial. ", "Survey weights are inverse selection probabilities -- they rarely have mean=1 out of the box. ", "The library rescales them internally so that weighted estimators are numerically stable. ", - "This is standard practice (Lumley 2004, §2.2). ", + "This is standard practice (Lumley 2004, \u00a72.2). ", "The warning confirms rescaling occurred; it is not an error." ] }, @@ -356,9 +356,8 @@ "- **JKn** (Jackknife delete-n): Stratified jackknife, drops one PSU per stratum.\n", "- **BRR** (Balanced Repeated Replication): Halve each stratum, reweight.\n", "- **Fay's BRR**: Modified BRR with a damping factor (0 < rho < 1).\n", - "- **SDR** (Successive Difference Replication): Used by ACS PUMS (80 replicate columns). Variance: V = 4/R × Σ(θᵣ − θ)².\n", "\n", - "`SurveyDesign` accepts replicate weights as an alternative to strata/PSU/FPC. They are **mutually exclusive** -- use one or the other.\n" + "`SurveyDesign` accepts replicate weights as an alternative to strata/PSU/FPC. They are **mutually exclusive** -- use one or the other." ] }, { @@ -531,58 +530,45 @@ "source": [ "## 9. Which Estimators Support Survey Design?\n", "\n", - "All estimators accept `survey_design` in `fit()`. Support depth varies — see the\n", - "[Survey Design Support](https://diff-diff.readthedocs.io/en/latest/choosing_estimator.html#survey-design-support)\n", - "table in the Choosing an Estimator guide for the full compatibility matrix.\n", - "\n", - "**Key highlights:**\n", - "- **CallawaySantAnna** has the most complete support: all design elements, replicate weights, analytical and bootstrap SEs, panel and cross-section modes\n", - "- **12 of 15 estimators** support replicate weights (BRR, Fay, JK1, JKn, SDR)\n", - "- **SyntheticDiD** and **TROP** support strata/PSU/FPC via bootstrap only (not placebo/analytical)\n" + "`diff-diff` supports survey design across all estimators, though the level of support varies:\n", + "\n", + "| Estimator | Weights | Strata/PSU/FPC (TSL) | Replicate Weights | Survey-Aware Bootstrap |\n", + "|-----------|---------|---------------------|-------------------|------------------------|\n", + "| **DifferenceInDifferences** | Full | Full | -- | -- |\n", + "| **TwoWayFixedEffects** | Full | Full | -- | -- |\n", + "| **MultiPeriodDiD** | Full | Full | -- | -- |\n", + "| **CallawaySantAnna** | pweight only | Full | Full | Multiplier at PSU |\n", + "| **TripleDifference** | pweight only | Full | Full (analytical) | -- |\n", + "| **StaggeredTripleDifference** | pweight only | Full | Full | Multiplier at PSU |\n", + "| **SunAbraham** | Full | Full | -- | Rao-Wu rescaled |\n", + "| **StackedDiD** | pweight only | Full (pweight only) | -- | -- |\n", + "| **ImputationDiD** | pweight only | Partial (no FPC) | -- | Multiplier at PSU |\n", + "| **TwoStageDiD** | pweight only | Partial (no FPC) | -- | Multiplier at PSU |\n", + "| **ContinuousDiD** | Full | Full | Full (analytical) | Multiplier at PSU |\n", + "| **EfficientDiD** | Full | Full | Full (analytical) | Multiplier at PSU |\n", + "| **SyntheticDiD** | pweight only | -- | -- | Rao-Wu rescaled |\n", + "| **TROP** | pweight only | -- | -- | Rao-Wu rescaled |\n", + "| **BaconDecomposition** | Diagnostic | Diagnostic | -- | -- |\n", + "\n", + "**Legend:**\n", + "- **Full**: All weight types (pweight/fweight/aweight) + strata/PSU/FPC + Taylor Series Linearization variance\n", + "- **Full (pweight only)**: Full TSL support with strata/PSU/FPC, but only accepts `pweight` weight type (`fweight`/`aweight` rejected because Q-weight composition changes their semantics)\n", + "- **Partial (no FPC)**: Weights + strata (for df) + PSU (for clustering); FPC raises `NotImplementedError`\n", + "- **pweight only** (Weights column): Only `pweight` accepted; `fweight`/`aweight` raise an error\n", + "- **pweight only** (TSL column): Sampling weights for point estimates; no strata/PSU/FPC design elements\n", + "- **Diagnostic**: Weighted descriptive statistics only (no inference)\n", + "- **--**: Not supported\n", + "\n", + "**Note:** `EfficientDiD` supports `covariates` and `survey_design` simultaneously. The doubly-robust (DR) path threads survey weights through WLS outcome regression, weighted sieve propensity ratios, and survey-weighted kernel smoothing.\n", + "\n", + "For full details, see `docs/survey-roadmap.md`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Summary\n", - "\n", - "**Key takeaways:**\n", - "\n", - "1. **Always specify the survey design** when working with survey data. Ignoring it leads to incorrect standard errors -- typically too small, leading to false positives.\n", - "\n", - "2. **`SurveyDesign`** encapsulates your survey's sampling structure in one object. Pass column names for weights, strata, PSU, and FPC.\n", - "\n", - "3. **Pass `survey_design` to `fit()`** -- the same API works across all estimators. No changes to your estimation code beyond adding one parameter.\n", - "\n", - "4. **CallawaySantAnna** has the most complete survey support: strata/PSU/FPC, replicate weights, analytical and bootstrap SEs, and both panel and cross-section modes.\n", - "\n", - "5. **Replicate weights** (JK1, JKn, BRR, Fay, SDR) are an alternative to strata/PSU/FPC when your survey provides them (e.g., MEPS, ACS PUMS). They are mutually exclusive with strata/PSU/FPC.\n", - "\n", - "6. **Use `subpopulation()`** instead of subsetting when estimating effects for a subgroup. Subsetting drops design information and biases variance estimates.\n", - "\n", - "7. **DEFF diagnostics** help you understand *how* the survey design affects precision. DEFF > 1 means clustering costs exceed stratification gains; DEFF < 1 means the design improves precision for that coefficient. DEFF > 2 indicates substantial clustering.\n", - "\n", - "8. **Repeated cross-sections** (`panel=False`) work with survey design for non-panel surveys like BRFSS, CPS, and ACS 1-year.\n", - "\n", - "**Quick reference:**\n", - "\n", - "| Parameter | When to use |\n", - "|-----------|------------|\n", - "| `weights` | Always -- specify the sampling weight column |\n", - "| `strata` | When the survey uses stratified sampling |\n", - "| `psu` | When multi-stage (clustered) sampling is used |\n", - "| `fpc` | When the sampling fraction is non-negligible |\n", - "| `replicate_weights` | When the survey provides replicate weights instead of strata/PSU/FPC |\n", - "\n", - "**References:**\n", - "\n", - "- Lumley, T. (2004). Analysis of Complex Survey Samples. *Journal of Statistical Software* 9(8).\n", - "- Solon, G., Haider, S. J., & Wooldridge, J. M. (2015). What Are We Weighting For? *Journal of Human Resources* 50(2).\n", - "- Binder, D. A. (1983). On the Variances of Asymptotically Normal Estimators from Complex Surveys. *International Statistical Review* 51(3).\n", - "- Rao, J. N. K., Wu, C. F. J., & Yue, K. (1992). Some Recent Work on Resampling Methods for Complex Surveys. *Survey Methodology* 18(2).\n", - "- Callaway, B. & Sant'Anna, P. H. C. (2021). Difference-in-Differences with Multiple Time Periods. *Journal of Econometrics* 225(2).\n", - "- Sant'Anna, P. H. C. & Zhao, J. (2020). Doubly Robust Difference-in-Differences Estimators. *Journal of Econometrics* 219(1).\n" + "## Summary\n\n**Key takeaways:**\n\n1. **Always specify the survey design** when working with survey data. Ignoring it leads to incorrect standard errors -- typically too small, leading to false positives.\n\n2. **`SurveyDesign`** encapsulates your survey's sampling structure in one object. Pass column names for weights, strata, PSU, and FPC.\n\n3. **Pass `survey_design` to `fit()`** -- the same API works across all estimators. No changes to your estimation code beyond adding one parameter.\n\n4. **CallawaySantAnna** has the most complete survey support: strata/PSU/FPC, replicate weights, analytical and bootstrap SEs, and both panel and cross-section modes.\n\n5. **Replicate weights** (JK1, JKn, BRR, Fay) are an alternative to strata/PSU/FPC when your survey provides them (e.g., MEPS, ACS PUMS). They are mutually exclusive with strata/PSU/FPC.\n\n6. **Use `subpopulation()`** instead of subsetting when estimating effects for a subgroup. Subsetting drops design information and biases variance estimates.\n\n7. **DEFF diagnostics** help you understand *how* the survey design affects precision. DEFF > 1 means clustering costs exceed stratification gains; DEFF < 1 means the design improves precision for that coefficient. DEFF > 2 indicates substantial clustering.\n\n8. **Repeated cross-sections** (`panel=False`) work with survey design for non-panel surveys like BRFSS, CPS, and ACS 1-year.\n\n**Quick reference:**\n\n| Parameter | When to use |\n|-----------|------------|\n| `weights` | Always -- specify the sampling weight column |\n| `strata` | When the survey uses stratified sampling |\n| `psu` | When multi-stage (clustered) sampling is used |\n| `fpc` | When the sampling fraction is non-negligible |\n| `replicate_weights` | When the survey provides replicate weights instead of strata/PSU/FPC |\n\n**References:**\n\n- Lumley, T. (2004). Analysis of Complex Survey Samples. *Journal of Statistical Software* 9(8).\n- Solon, G., Haider, S. J., & Wooldridge, J. M. (2015). What Are We Weighting For? *Journal of Human Resources* 50(2).\n- Binder, D. A. (1983). On the Variances of Asymptotically Normal Estimators from Complex Surveys. *International Statistical Review* 51(3).\n- Rao, J. N. K., Wu, C. F. J., & Yue, K. (1992). Some Recent Work on Resampling Methods for Complex Surveys. *Survey Methodology* 18(2).\n- Callaway, B. & Sant'Anna, P. H. C. (2021). Difference-in-Differences with Multiple Time Periods. *Journal of Econometrics* 225(2).\n- Sant'Anna, P. H. C. & Zhao, J. (2020). Doubly Robust Difference-in-Differences Estimators. *Journal of Econometrics* 219(1)." ] } ], @@ -593,4 +579,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} +} \ No newline at end of file diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index 2148ebd2..99c2d1da 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -1044,6 +1044,66 @@ def test_bootstrap_covariates_survey(self, cov_survey_data): assert np.isfinite(result.overall_se) assert result.overall_se > 0 + def test_analytical_se_differs_from_unweighted(self, cov_survey_data): + """Survey analytical SE should differ from unweighted SE.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result_survey = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_nosurv = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + ) + # Non-uniform weights (1.0 + 0.3*stratum) should produce different SEs + assert result_survey.overall_se != result_nosurv.overall_se + assert np.isfinite(result_survey.overall_se) + assert result_survey.overall_se > 0 + + def test_bootstrap_se_in_ballpark_of_analytical(self, cov_survey_data): + """Bootstrap SE should be in same ballpark as analytical SE.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result_analytical = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_boot = EfficientDiD(n_bootstrap=199, seed=42).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + ratio = result_boot.overall_se / result_analytical.overall_se + assert 0.3 < ratio < 3.0, ( + f"Bootstrap/analytical SE ratio {ratio:.2f} outside [0.3, 3.0]" + ) + + def test_zero_weight_cohort_skipped(self, cov_survey_data): + """Zero-weight cohort should be skipped with a warning.""" + from diff_diff import EfficientDiD + + # Set early cohort (first_treat=4) weights to near-zero + cov_survey_data = cov_survey_data.copy() + cov_survey_data.loc[cov_survey_data["first_treat"] == 4, "weight"] = 1e-15 + sd = SurveyDesign(weights="weight") + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + # ============================================================================= # Scale Invariance (applies to all estimators) From aca9f8bf00dd7d84fa24ea5905ace71101f28e5c Mon Sep 17 00:00:00 2001 From: igerber Date: Sat, 4 Apr 2026 11:21:47 -0400 Subject: [PATCH 3/6] Fix IF-scale mismatch in survey aggregation, bootstrap scores, zero-weight guards - Switch aggregated survey SEs from compute_survey_vcov() to compute_survey_if_variance() with score-level psi values, avoiding double-weighting of the survey-weighted WIF term - Bootstrap now perturbs survey-score object w_i*eif_i/sum(w) instead of raw eif_i/n, matching analytical variance convention - Filter zero-weight comparison groups from valid pairs before nuisance estimation; guard in estimate_inverse_propensity_sieve fallback - Fix zero-weight test to use actual 0.0 weights with pytest.warns Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/efficient_did.py | 54 +++++++++++++++++++++++---- diff_diff/efficient_did_bootstrap.py | 17 ++++----- diff_diff/efficient_did_covariates.py | 6 ++- tests/test_survey_phase3.py | 19 ++++++---- 4 files changed, 70 insertions(+), 26 deletions(-) diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index 1cb529b4..c9a0bf23 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -606,6 +606,15 @@ def fit( stacklevel=2, ) + # Guard: never-treated with zero survey weight → no valid comparisons + if cohort_fractions.get(np.inf, 0.0) <= 0 and use_covariates: + warnings.warn( + "Never-treated group has zero survey weight; no valid " + "comparisons possible for covariates path.", + UserWarning, + stacklevel=2, + ) + # ----- Covariate preparation (if provided) ----- covariate_matrix: Optional[np.ndarray] = None m_hat_cache: Dict[Tuple, np.ndarray] = {} @@ -705,6 +714,15 @@ def fit( anticipation=self.anticipation, ) + # Filter out comparison pairs with zero survey weight + if unit_level_weights is not None and pairs: + pairs = [ + (gp, tpre) for gp, tpre in pairs + if np.sum(unit_level_weights[ + never_treated_mask if np.isinf(gp) else cohort_masks[gp] + ]) > 0 + ] + if not pairs: warnings.warn( f"No valid comparison pairs for (g={g}, t={t}). " "ATT will be NaN.", @@ -1256,11 +1274,21 @@ def _aggregate_overall( keepers, effects, unit_cohorts, cohort_fractions, n_units, unit_weights=self._unit_level_weights, ) - agg_eif_total = agg_eif + wif # both O(1) scale - - # SE = sqrt(mean(EIF^2) / n) — standard IF-based SE - # (dispatches to survey TSL or cluster-robust when active) - se = self._eif_se(agg_eif_total, n_units, cluster_indices, n_clusters) + # Compute SE: survey path uses score-level psi + compute_survey_if_variance + # to avoid double-weighting (compute_survey_vcov applies w_i internally, + # which would double-weight the survey-weighted WIF term). + if self._unit_resolved_survey is not None: + uw = self._unit_level_weights + total_w = float(np.sum(uw)) + # Score-level: standard = w*eif/sum(w), wif already has w_i in indicator + psi_total = uw * agg_eif / total_w + wif / total_w + from diff_diff.survey import compute_survey_if_variance + + variance = compute_survey_if_variance(psi_total, self._unit_resolved_survey) + se = float(np.sqrt(max(variance, 0.0))) if np.isfinite(variance) else np.nan + else: + agg_eif_total = agg_eif + wif + se = self._eif_se(agg_eif_total, n_units, cluster_indices, n_clusters) return overall_att, se @@ -1346,16 +1374,26 @@ def _aggregate_event_study( agg_eif += w[k] * eif_by_gt[gt] # WIF correction for event-study aggregation + wif_e = np.zeros(n_units) if unit_cohorts is not None: es_keepers = [(g, t) for (g, t) in gt_pairs] es_effects = effs - wif = self._compute_wif_contribution( + wif_e = self._compute_wif_contribution( es_keepers, es_effects, unit_cohorts, cohort_fractions, n_units, unit_weights=self._unit_level_weights, ) - agg_eif = agg_eif + wif - agg_se = self._eif_se(agg_eif, n_units, cluster_indices, n_clusters) + if self._unit_resolved_survey is not None: + uw = self._unit_level_weights + total_w = float(np.sum(uw)) + psi_total = uw * agg_eif / total_w + wif_e / total_w + from diff_diff.survey import compute_survey_if_variance + + variance = compute_survey_if_variance(psi_total, self._unit_resolved_survey) + agg_se = float(np.sqrt(max(variance, 0.0))) if np.isfinite(variance) else np.nan + else: + agg_eif = agg_eif + wif_e + agg_se = self._eif_se(agg_eif, n_units, cluster_indices, n_clusters) t_stat, p_val, ci = safe_inference( agg_eff, agg_se, alpha=self.alpha, df=self._survey_df diff --git a/diff_diff/efficient_did_bootstrap.py b/diff_diff/efficient_did_bootstrap.py index 03ba4077..26ad55ed 100644 --- a/diff_diff/efficient_did_bootstrap.py +++ b/diff_diff/efficient_did_bootstrap.py @@ -137,19 +137,18 @@ def _run_multiplier_bootstrap( original_atts = np.array([group_time_effects[gt]["effect"] for gt in gt_pairs]) # Perturbed ATTs: (n_bootstrap, n_gt) - # Under survey design, normalize by sum(survey_weights) instead of n_units - # (pweights are normalized to mean=1, so numerically equivalent, but explicit - # for robustness against future weight types) - denom = ( - float(np.sum(unit_level_weights)) - if unit_level_weights is not None - else float(n_units) - ) + # Under survey design, perturb survey-score object w_i * eif_i / sum(w) + # to match the analytical variance convention (compute_survey_if_variance). bootstrap_atts = np.zeros((self.n_bootstrap, n_gt)) for j, gt in enumerate(gt_pairs): eif_gt = eif_by_gt[gt] # shape (n_units,) with np.errstate(divide="ignore", invalid="ignore", over="ignore"): - perturbation = (all_weights @ eif_gt) / denom + if unit_level_weights is not None: + total_w = float(np.sum(unit_level_weights)) + eif_scaled = unit_level_weights * eif_gt / total_w + perturbation = all_weights @ eif_scaled + else: + perturbation = (all_weights @ eif_gt) / n_units bootstrap_atts[:, j] = original_atts[j] + perturbation # Post-treatment mask — also exclude NaN effects diff --git a/diff_diff/efficient_did_covariates.py b/diff_diff/efficient_did_covariates.py index 167985de..bb19ddf5 100644 --- a/diff_diff/efficient_did_covariates.py +++ b/diff_diff/efficient_did_covariates.py @@ -364,8 +364,12 @@ def estimate_inverse_propensity_sieve( # Weighted loss normalization and fallback if unit_weights is not None: w_group = unit_weights[group_mask] + sum_w_group = float(np.sum(w_group)) + if sum_w_group <= 0: + # Zero survey weight for this group — return unconditional fallback + return np.ones(n_units) n_units_w = float(np.sum(unit_weights)) - fallback_ratio = n_units_w / float(np.sum(w_group)) + fallback_ratio = n_units_w / sum_w_group else: w_group = None n_units_w = float(n_units) diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index 99c2d1da..6bc8eebb 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -1091,16 +1091,19 @@ def test_zero_weight_cohort_skipped(self, cov_survey_data): """Zero-weight cohort should be skipped with a warning.""" from diff_diff import EfficientDiD - # Set early cohort (first_treat=4) weights to near-zero + # Set early cohort (first_treat=4) weights to exactly zero cov_survey_data = cov_survey_data.copy() - cov_survey_data.loc[cov_survey_data["first_treat"] == 4, "weight"] = 1e-15 + cov_survey_data.loc[cov_survey_data["first_treat"] == 4, "weight"] = 0.0 + # Need small positive weight for pweight validation (can't be all zero) + # Keep remaining cohorts with positive weights sd = SurveyDesign(weights="weight") - result = EfficientDiD(n_bootstrap=0).fit( - cov_survey_data, - "outcome", "unit", "time", "first_treat", - covariates=["x1"], - survey_design=sd, - ) + with pytest.warns(UserWarning, match="zero survey weight"): + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) assert np.isfinite(result.overall_att) assert np.isfinite(result.overall_se) From 42d3dffdd4262f70abcb429844b987509a00aab2 Mon Sep 17 00:00:00 2001 From: igerber Date: Sat, 4 Apr 2026 11:30:37 -0400 Subject: [PATCH 4/6] Restore replicate-weight dispatch in aggregation, raise on zero-weight never-treated MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add uses_replicate_variance branching in _aggregate_overall() and _aggregate_event_study() to route replicate designs to compute_replicate_if_variance() instead of compute_survey_if_variance() - Change zero-weight never-treated guard from warning to ValueError for covariates path — DR nuisance estimation requires positive-weight controls - Add test_zero_weight_never_treated_raises for the new error path Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/efficient_did.py | 46 ++++++++++++++++++++++++++----------- tests/test_survey_phase3.py | 19 ++++++++++++--- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index c9a0bf23..49831151 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -608,11 +608,10 @@ def fit( # Guard: never-treated with zero survey weight → no valid comparisons if cohort_fractions.get(np.inf, 0.0) <= 0 and use_covariates: - warnings.warn( - "Never-treated group has zero survey weight; no valid " - "comparisons possible for covariates path.", - UserWarning, - stacklevel=2, + raise ValueError( + "Never-treated group has zero survey weight. The doubly " + "robust covariates path requires a never-treated control " + "group with positive survey weight for nuisance estimation." ) # ----- Covariate preparation (if provided) ----- @@ -1274,17 +1273,27 @@ def _aggregate_overall( keepers, effects, unit_cohorts, cohort_fractions, n_units, unit_weights=self._unit_level_weights, ) - # Compute SE: survey path uses score-level psi + compute_survey_if_variance - # to avoid double-weighting (compute_survey_vcov applies w_i internally, - # which would double-weight the survey-weighted WIF term). + # Compute SE: survey path uses score-level psi to avoid double-weighting + # (compute_survey_vcov applies w_i internally, which would double-weight + # the survey-weighted WIF term). Dispatch replicate vs TSL. if self._unit_resolved_survey is not None: uw = self._unit_level_weights total_w = float(np.sum(uw)) - # Score-level: standard = w*eif/sum(w), wif already has w_i in indicator psi_total = uw * agg_eif / total_w + wif / total_w - from diff_diff.survey import compute_survey_if_variance - variance = compute_survey_if_variance(psi_total, self._unit_resolved_survey) + if (hasattr(self._unit_resolved_survey, 'uses_replicate_variance') + and self._unit_resolved_survey.uses_replicate_variance): + from diff_diff.survey import compute_replicate_if_variance + + variance, _ = compute_replicate_if_variance( + psi_total, self._unit_resolved_survey + ) + else: + from diff_diff.survey import compute_survey_if_variance + + variance = compute_survey_if_variance( + psi_total, self._unit_resolved_survey + ) se = float(np.sqrt(max(variance, 0.0))) if np.isfinite(variance) else np.nan else: agg_eif_total = agg_eif + wif @@ -1387,9 +1396,20 @@ def _aggregate_event_study( uw = self._unit_level_weights total_w = float(np.sum(uw)) psi_total = uw * agg_eif / total_w + wif_e / total_w - from diff_diff.survey import compute_survey_if_variance - variance = compute_survey_if_variance(psi_total, self._unit_resolved_survey) + if (hasattr(self._unit_resolved_survey, 'uses_replicate_variance') + and self._unit_resolved_survey.uses_replicate_variance): + from diff_diff.survey import compute_replicate_if_variance + + variance, _ = compute_replicate_if_variance( + psi_total, self._unit_resolved_survey + ) + else: + from diff_diff.survey import compute_survey_if_variance + + variance = compute_survey_if_variance( + psi_total, self._unit_resolved_survey + ) agg_se = float(np.sqrt(max(variance, 0.0))) if np.isfinite(variance) else np.nan else: agg_eif = agg_eif + wif_e diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index 6bc8eebb..ba43b394 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -1088,14 +1088,12 @@ def test_bootstrap_se_in_ballpark_of_analytical(self, cov_survey_data): ) def test_zero_weight_cohort_skipped(self, cov_survey_data): - """Zero-weight cohort should be skipped with a warning.""" + """Zero-weight treated cohort should be skipped with a warning.""" from diff_diff import EfficientDiD # Set early cohort (first_treat=4) weights to exactly zero cov_survey_data = cov_survey_data.copy() cov_survey_data.loc[cov_survey_data["first_treat"] == 4, "weight"] = 0.0 - # Need small positive weight for pweight validation (can't be all zero) - # Keep remaining cohorts with positive weights sd = SurveyDesign(weights="weight") with pytest.warns(UserWarning, match="zero survey weight"): result = EfficientDiD(n_bootstrap=0).fit( @@ -1107,6 +1105,21 @@ def test_zero_weight_cohort_skipped(self, cov_survey_data): assert np.isfinite(result.overall_att) assert np.isfinite(result.overall_se) + def test_zero_weight_never_treated_raises(self, cov_survey_data): + """Zero-weight never-treated group should raise ValueError.""" + from diff_diff import EfficientDiD + + cov_survey_data = cov_survey_data.copy() + cov_survey_data.loc[cov_survey_data["first_treat"] == 0, "weight"] = 0.0 + sd = SurveyDesign(weights="weight") + with pytest.raises(ValueError, match="zero survey weight"): + EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + # ============================================================================= # Scale Invariance (applies to all estimators) From d45df253a0eb3f71f67dfc640ddf8ab92308cbae Mon Sep 17 00:00:00 2001 From: igerber Date: Sat, 4 Apr 2026 12:24:50 -0400 Subject: [PATCH 5/6] Add replicate-weight aggregation regression test for DR+survey path Exercises the compute_replicate_if_variance dispatch in _aggregate_overall and _aggregate_event_study with covariates + replicate-weight survey design, guarding against future regressions to the wrong variance helper. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_survey_phase3.py | 52 +++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index ba43b394..a128ab08 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -1120,6 +1120,58 @@ def test_zero_weight_never_treated_raises(self, cov_survey_data): survey_design=sd, ) + def test_replicate_weight_aggregation(self): + """Replicate-weight aggregation SEs use compute_replicate_if_variance.""" + from diff_diff import EfficientDiD + + # Reuse the fixture from test_survey_phase6 + np.random.seed(42) + n_units, n_periods, n_rep = 60, 6, 15 + # Generate per-unit replicate weights (constant within unit across time) + unit_repwts = {} + for unit in range(n_units): + wt = 1.0 + 0.3 * (unit % 5) + unit_repwts[unit] = { + f"repwt_{r}": wt * (0.5 + np.random.random()) + for r in range(n_rep) + } + rows = [] + for unit in range(n_units): + ft = 3 if unit < 20 else (5 if unit < 40 else 0) + wt = 1.0 + 0.3 * (unit % 5) + x1 = np.random.randn() + for t in range(1, n_periods + 1): + y = 10.0 + unit * 0.05 + t * 0.2 + 0.5 * x1 + if ft > 0 and t >= ft: + y += 2.0 + y += np.random.normal(0, 0.5) + row = {"unit": unit, "time": t, "first_treat": ft, + "outcome": y, "weight": wt, "x1": x1} + row.update(unit_repwts[unit]) + rows.append(row) + import pandas as pd + data = pd.DataFrame(rows) + rep_cols = [f"repwt_{r}" for r in range(n_rep)] + + sd = SurveyDesign( + weights="weight", replicate_weights=rep_cols, + replicate_method="JK1", + ) + result = EfficientDiD(n_bootstrap=0).fit( + data, "outcome", "unit", "time", "first_treat", + covariates=["x1"], + aggregate="event_study", + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + assert result.event_study_effects is not None + for _, eff in result.event_study_effects.items(): + assert np.isfinite(eff["effect"]) + assert np.isfinite(eff["se"]) + assert eff["se"] > 0 + # ============================================================================= # Scale Invariance (applies to all estimators) From ca9522c4c204a48838c46d327a0856026e88efc3 Mon Sep 17 00:00:00 2001 From: igerber Date: Sat, 4 Apr 2026 12:37:32 -0400 Subject: [PATCH 6/6] Extend zero-weight never-treated guard to nocov path, fix roadmap inconsistency - Remove `and use_covariates` condition from zero-weight never-treated guard so both DR and nocov survey paths fail fast with ValueError - Remove stale covariates+survey limitation entry from survey-roadmap.md deferred list (now resolved) - Add test_zero_weight_never_treated_nocov_raises for nocov path Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/efficient_did.py | 9 +++++---- docs/survey-roadmap.md | 1 - tests/test_survey_phase3.py | 16 +++++++++++++++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index 49831151..138c6790 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -607,11 +607,12 @@ def fit( ) # Guard: never-treated with zero survey weight → no valid comparisons - if cohort_fractions.get(np.inf, 0.0) <= 0 and use_covariates: + # Applies to both covariates (DR nuisance) and nocov (weighted means) paths + if cohort_fractions.get(np.inf, 0.0) <= 0 and unit_level_weights is not None: raise ValueError( - "Never-treated group has zero survey weight. The doubly " - "robust covariates path requires a never-treated control " - "group with positive survey weight for nuisance estimation." + "Never-treated group has zero survey weight. EfficientDiD " + "requires a never-treated control group with positive " + "survey weight for estimation." ) # ----- Covariate preparation (if provided) ----- diff --git a/docs/survey-roadmap.md b/docs/survey-roadmap.md index 274f9c6e..6992be62 100644 --- a/docs/survey-roadmap.md +++ b/docs/survey-roadmap.md @@ -302,7 +302,6 @@ multi-absorb + survey weights). | Limitation | Reason | |-----------|--------| -| `covariates` + `survey_design` | DR nuisance path doesn't thread survey weights | | `cluster` + `survey_design` | Use `survey_design` with PSU/strata instead | ### Bootstrap + Replicate Weights (Mutual Exclusion) diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index a128ab08..fbac504b 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -1106,7 +1106,7 @@ def test_zero_weight_cohort_skipped(self, cov_survey_data): assert np.isfinite(result.overall_se) def test_zero_weight_never_treated_raises(self, cov_survey_data): - """Zero-weight never-treated group should raise ValueError.""" + """Zero-weight never-treated group should raise ValueError (DR path).""" from diff_diff import EfficientDiD cov_survey_data = cov_survey_data.copy() @@ -1120,6 +1120,20 @@ def test_zero_weight_never_treated_raises(self, cov_survey_data): survey_design=sd, ) + def test_zero_weight_never_treated_nocov_raises(self, cov_survey_data): + """Zero-weight never-treated group should raise ValueError (nocov path).""" + from diff_diff import EfficientDiD + + cov_survey_data = cov_survey_data.copy() + cov_survey_data.loc[cov_survey_data["first_treat"] == 0, "weight"] = 0.0 + sd = SurveyDesign(weights="weight") + with pytest.raises(ValueError, match="zero survey weight"): + EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + survey_design=sd, + ) + def test_replicate_weight_aggregation(self): """Replicate-weight aggregation SEs use compute_replicate_if_variance.""" from diff_diff import EfficientDiD