diff --git a/TODO.md b/TODO.md index 748a2dd8..9ea2f232 100644 --- a/TODO.md +++ b/TODO.md @@ -69,6 +69,7 @@ Deferred items from PR reviews that were not addressed before merge. | TripleDifference power: `generate_ddd_data` is a fixed 2×2×2 cross-sectional DGP — no multi-period or unbalanced-group support. Add a `generate_ddd_panel_data` for panel DDD power analysis. | `prep_dgp.py`, `power.py` | #208 | Low | | ContinuousDiD event-study aggregation anticipation filter — **Resolved**. `_aggregate_event_study()` now filters `e < -anticipation` when `anticipation > 0`, matching CallawaySantAnna behavior. Bootstrap paths also filtered. | `continuous_did.py` | #226 | Resolved | | Survey design resolution/collapse patterns are inconsistent across panel estimators — ContinuousDiD rebuilds unit-level design in SE code, EfficientDiD builds once in fit(), StackedDiD re-resolves on stacked data; extract shared helpers for panel-to-unit collapse, post-filter re-resolution, and metadata recomputation | `continuous_did.py`, `efficient_did.py`, `stacked_did.py` | #226 | Low | +| Survey-weighted Silverman bandwidth in EfficientDiD conditional Omega* — `_silverman_bandwidth()` uses unweighted mean/std for bandwidth selection; survey-weighted statistics would better reflect the population distribution but is a second-order refinement | `efficient_did_covariates.py` | — | Low | | Survey metadata formatting dedup — **Resolved**. Extracted `_format_survey_block()` helper in `results.py`, replaced 13 occurrences across 11 files. | `results.py` + 10 results files | — | Resolved | | TROP: `fit()` and `_fit_global()` share ~150 lines of near-identical data setup (panel pivoting, absorbing-state validation, first-treatment detection, effective rank, NaN warnings). Both bootstrap methods also duplicate the stratified resampling loop. Extract shared helpers to eliminate cross-file sync risk. | `trop.py`, `trop_global.py`, `trop_local.py` | — | Low | | StaggeredTripleDifference R cross-validation: CSV fixtures not committed (gitignored); tests skip without local R + triplediff. Commit fixtures or generate deterministically. | `tests/test_methodology_staggered_triple_diff.py` | #245 | Medium | diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index d25f0512..138c6790 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -347,8 +347,6 @@ def fit( ValueError Missing columns, unbalanced panel, non-absorbing treatment, or PT-Post without a never-treated group. - NotImplementedError - If ``covariates`` and ``survey_design`` are both set. """ self._validate_params() @@ -381,16 +379,6 @@ def fit( # Bootstrap + survey supported via PSU-level multiplier bootstrap. - # Guard covariates + survey (DR path does not yet thread survey weights) - if covariates is not None and len(covariates) > 0 and resolved_survey is not None: - raise NotImplementedError( - "Survey weights with covariates are not yet supported for " - "EfficientDiD. The doubly robust covariate path does not " - "thread survey weights through nuisance estimation. " - "Use covariates=None with survey_design, or drop survey_design " - "when using covariates." - ) - # Normalize empty covariates list to None (use nocov path) if covariates is not None and len(covariates) == 0: covariates = None @@ -583,6 +571,7 @@ def fit( # Use the resolved survey's weights (already normalized per weight_type) # subset to unit level via _unit_first_panel_row (aligned to all_units) unit_level_weights = self._unit_resolved_survey.weights + self._unit_level_weights = unit_level_weights cohort_fractions: Dict[float, float] = {} if unit_level_weights is not None: @@ -617,6 +606,15 @@ def fit( stacklevel=2, ) + # Guard: never-treated with zero survey weight → no valid comparisons + # Applies to both covariates (DR nuisance) and nocov (weighted means) paths + if cohort_fractions.get(np.inf, 0.0) <= 0 and unit_level_weights is not None: + raise ValueError( + "Never-treated group has zero survey weight. EfficientDiD " + "requires a never-treated control group with positive " + "survey weight for estimation." + ) + # ----- Covariate preparation (if provided) ----- covariate_matrix: Optional[np.ndarray] = None m_hat_cache: Dict[Tuple, np.ndarray] = {} @@ -686,6 +684,15 @@ def fit( else: effective_p1_col = period_1_col + # Guard: skip cohorts with zero survey weight (all units zero-weighted) + if cohort_fractions[g] <= 0: + warnings.warn( + f"Cohort {g} has zero survey weight; skipping.", + UserWarning, + stacklevel=2, + ) + continue + # Estimate all (g, t) cells including pre-treatment. Under PT-Post, # pre-treatment cells serve as placebo/pre-trend diagnostics, matching # the CallawaySantAnna implementation. Users filter to t >= g for @@ -707,6 +714,15 @@ def fit( anticipation=self.anticipation, ) + # Filter out comparison pairs with zero survey weight + if unit_level_weights is not None and pairs: + pairs = [ + (gp, tpre) for gp, tpre in pairs + if np.sum(unit_level_weights[ + never_treated_mask if np.isinf(gp) else cohort_masks[gp] + ]) > 0 + ] + if not pairs: warnings.warn( f"No valid comparison pairs for (g={g}, t={t}). " "ATT will be NaN.", @@ -742,6 +758,7 @@ def fit( never_treated_mask, t_col_val, tpre_col_val, + unit_weights=unit_level_weights, ) # m_{g', tpre, 1}(X) key_gp_tpre = (gp, tpre_col_val, effective_p1_col) @@ -755,6 +772,7 @@ def fit( gp_mask_for_reg, tpre_col_val, effective_p1_col, + unit_weights=unit_level_weights, ) # r_{g, inf}(X) and r_{g, g'}(X) via sieve (Eq 4.1-4.2) for comp in {np.inf, gp}: @@ -770,6 +788,7 @@ def fit( k_max=self.sieve_k_max, criterion=self.sieve_criterion, ratio_clip=self.ratio_clip, + unit_weights=unit_level_weights, ) # Per-unit DR generated outcomes: shape (n_units, H) @@ -801,6 +820,7 @@ def fit( group_mask_s, k_max=self.sieve_k_max, criterion=self.sieve_criterion, + unit_weights=unit_level_weights, ) # Conditional Omega*(X) with per-unit propensities (Eq 3.12) @@ -817,14 +837,19 @@ def fit( covariate_matrix=covariate_matrix, s_hat_cache=s_hat_cache, bandwidth=self.kernel_bandwidth, + unit_weights=unit_level_weights, ) # Per-unit weights: (n_units, H) per_unit_w = compute_per_unit_weights(omega_cond) - # ATT = mean_i( w(X_i) @ gen_out[i] ) + # ATT = (survey-)weighted mean of per-unit DR scores if per_unit_w.shape[1] > 0: - att_gt = float(np.mean(np.sum(per_unit_w * gen_out, axis=1))) + per_unit_scores = np.sum(per_unit_w * gen_out, axis=1) + if unit_level_weights is not None: + att_gt = float(np.average(per_unit_scores, weights=unit_level_weights)) + else: + att_gt = float(np.mean(per_unit_scores)) else: att_gt = np.nan @@ -979,6 +1004,7 @@ def fit( cluster_indices=unit_cluster_indices, n_clusters=n_clusters, resolved_survey=self._unit_resolved_survey, + unit_level_weights=self._unit_level_weights, ) # Update estimates with bootstrap inference overall_se = bootstrap_results.overall_att_se @@ -1140,6 +1166,7 @@ def _compute_wif_contribution( unit_cohorts: np.ndarray, cohort_fractions: Dict[float, float], n_units: int, + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: """Compute weight influence function correction (O(1) scale, matching EIF). @@ -1159,6 +1186,9 @@ def _compute_wif_contribution( ``{cohort: n_cohort / n}`` for each cohort. n_units : int Total number of units. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses the + survey-weighted WIF formula: IF_i(p_g) = (w_i * 1{G_i=g} - pg_k). Returns ------- @@ -1172,10 +1202,19 @@ def _compute_wif_contribution( return np.zeros(n_units) indicator = (unit_cohorts[:, None] == groups_for_keepers[None, :]).astype(float) - indicator_sum = np.sum(indicator - pg_keepers, axis=1) + + if unit_weights is not None: + # Survey-weighted WIF (matches staggered_aggregation.py:392-401): + # IF_i(p_g) = (w_i * 1{G_i=g} - pg_k), NOT (1{G_i=g} - pg_k) + weighted_indicator = indicator * unit_weights[:, None] + indicator_diff = weighted_indicator - pg_keepers + indicator_sum = np.sum(indicator_diff, axis=1) + else: + indicator_diff = indicator - pg_keepers + indicator_sum = np.sum(indicator_diff, axis=1) with np.errstate(divide="ignore", invalid="ignore", over="ignore"): - if1 = (indicator - pg_keepers) / sum_pg + if1 = indicator_diff / sum_pg if2 = np.outer(indicator_sum, pg_keepers) / sum_pg**2 wif_matrix = if1 - if2 wif_contrib = wif_matrix @ effects @@ -1232,13 +1271,34 @@ def _aggregate_overall( # WIF correction: accounts for uncertainty in cohort-size weights wif = self._compute_wif_contribution( - keepers, effects, unit_cohorts, cohort_fractions, n_units + keepers, effects, unit_cohorts, cohort_fractions, n_units, + unit_weights=self._unit_level_weights, ) - agg_eif_total = agg_eif + wif # both O(1) scale + # Compute SE: survey path uses score-level psi to avoid double-weighting + # (compute_survey_vcov applies w_i internally, which would double-weight + # the survey-weighted WIF term). Dispatch replicate vs TSL. + if self._unit_resolved_survey is not None: + uw = self._unit_level_weights + total_w = float(np.sum(uw)) + psi_total = uw * agg_eif / total_w + wif / total_w + + if (hasattr(self._unit_resolved_survey, 'uses_replicate_variance') + and self._unit_resolved_survey.uses_replicate_variance): + from diff_diff.survey import compute_replicate_if_variance + + variance, _ = compute_replicate_if_variance( + psi_total, self._unit_resolved_survey + ) + else: + from diff_diff.survey import compute_survey_if_variance - # SE = sqrt(mean(EIF^2) / n) — standard IF-based SE - # (dispatches to survey TSL or cluster-robust when active) - se = self._eif_se(agg_eif_total, n_units, cluster_indices, n_clusters) + variance = compute_survey_if_variance( + psi_total, self._unit_resolved_survey + ) + se = float(np.sqrt(max(variance, 0.0))) if np.isfinite(variance) else np.nan + else: + agg_eif_total = agg_eif + wif + se = self._eif_se(agg_eif_total, n_units, cluster_indices, n_clusters) return overall_att, se @@ -1324,15 +1384,37 @@ def _aggregate_event_study( agg_eif += w[k] * eif_by_gt[gt] # WIF correction for event-study aggregation + wif_e = np.zeros(n_units) if unit_cohorts is not None: es_keepers = [(g, t) for (g, t) in gt_pairs] es_effects = effs - wif = self._compute_wif_contribution( - es_keepers, es_effects, unit_cohorts, cohort_fractions, n_units + wif_e = self._compute_wif_contribution( + es_keepers, es_effects, unit_cohorts, cohort_fractions, n_units, + unit_weights=self._unit_level_weights, ) - agg_eif = agg_eif + wif - agg_se = self._eif_se(agg_eif, n_units, cluster_indices, n_clusters) + if self._unit_resolved_survey is not None: + uw = self._unit_level_weights + total_w = float(np.sum(uw)) + psi_total = uw * agg_eif / total_w + wif_e / total_w + + if (hasattr(self._unit_resolved_survey, 'uses_replicate_variance') + and self._unit_resolved_survey.uses_replicate_variance): + from diff_diff.survey import compute_replicate_if_variance + + variance, _ = compute_replicate_if_variance( + psi_total, self._unit_resolved_survey + ) + else: + from diff_diff.survey import compute_survey_if_variance + + variance = compute_survey_if_variance( + psi_total, self._unit_resolved_survey + ) + agg_se = float(np.sqrt(max(variance, 0.0))) if np.isfinite(variance) else np.nan + else: + agg_eif = agg_eif + wif_e + agg_se = self._eif_se(agg_eif, n_units, cluster_indices, n_clusters) t_stat, p_val, ci = safe_inference( agg_eff, agg_se, alpha=self.alpha, df=self._survey_df diff --git a/diff_diff/efficient_did_bootstrap.py b/diff_diff/efficient_did_bootstrap.py index cbeb8161..26ad55ed 100644 --- a/diff_diff/efficient_did_bootstrap.py +++ b/diff_diff/efficient_did_bootstrap.py @@ -63,6 +63,7 @@ def _run_multiplier_bootstrap( cluster_indices: Optional[np.ndarray] = None, n_clusters: Optional[int] = None, resolved_survey: object = None, + unit_level_weights: Optional[np.ndarray] = None, ) -> EDiDBootstrapResults: """Run multiplier bootstrap on stored EIF values. @@ -136,11 +137,18 @@ def _run_multiplier_bootstrap( original_atts = np.array([group_time_effects[gt]["effect"] for gt in gt_pairs]) # Perturbed ATTs: (n_bootstrap, n_gt) + # Under survey design, perturb survey-score object w_i * eif_i / sum(w) + # to match the analytical variance convention (compute_survey_if_variance). bootstrap_atts = np.zeros((self.n_bootstrap, n_gt)) for j, gt in enumerate(gt_pairs): eif_gt = eif_by_gt[gt] # shape (n_units,) with np.errstate(divide="ignore", invalid="ignore", over="ignore"): - perturbation = (all_weights @ eif_gt) / n_units + if unit_level_weights is not None: + total_w = float(np.sum(unit_level_weights)) + eif_scaled = unit_level_weights * eif_gt / total_w + perturbation = all_weights @ eif_scaled + else: + perturbation = (all_weights @ eif_gt) / n_units bootstrap_atts[:, j] = original_atts[j] + perturbation # Post-treatment mask — also exclude NaN effects diff --git a/diff_diff/efficient_did_covariates.py b/diff_diff/efficient_did_covariates.py index 9e3052c2..bb19ddf5 100644 --- a/diff_diff/efficient_did_covariates.py +++ b/diff_diff/efficient_did_covariates.py @@ -37,6 +37,7 @@ def estimate_outcome_regression( group_mask: np.ndarray, t_col: int, tpre_col: int, + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: """Estimate conditional mean outcome change m_hat(X) for a comparison group. @@ -56,6 +57,9 @@ def estimate_outcome_regression( Mask selecting units in the comparison group. t_col, tpre_col : int Column indices in ``outcome_wide`` for the two time periods. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses WLS + instead of OLS for the within-group regression. Returns ------- @@ -68,9 +72,13 @@ def estimate_outcome_regression( X_group = covariate_matrix[group_mask] X_design = np.column_stack([np.ones(len(X_group)), X_group]) + w_group = unit_weights[group_mask] if unit_weights is not None else None + coef, _, _ = solve_ols( X_design, delta_y, + weights=w_group, + weight_type="pweight" if w_group is not None else None, return_vcov=False, rank_deficient_action="warn", ) @@ -121,7 +129,9 @@ def _polynomial_sieve_basis(X: np.ndarray, degree: int) -> np.ndarray: """ n, d = X.shape - # Standardize for numerical stability + # Standardize for numerical stability (unweighted mean/std intentional — + # this is only for conditioning, not for the statistical estimand; with + # survey weights the sieve basis is the same, only the objective changes) X_mean = X.mean(axis=0) X_std = X.std(axis=0) X_std[X_std < 1e-10] = 1.0 # avoid division by zero for constant columns @@ -146,6 +156,7 @@ def estimate_propensity_ratio_sieve( k_max: Optional[int] = None, criterion: str = "bic", ratio_clip: float = 20.0, + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: r"""Estimate propensity ratio via sieve convex minimization (Eq 4.1-4.2). @@ -176,6 +187,9 @@ def estimate_propensity_ratio_sieve( ``"aic"`` or ``"bic"``. ratio_clip : float Clip ratios to ``[1/ratio_clip, ratio_clip]``. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses weighted + normal equations for the sieve estimation. Returns ------- @@ -197,9 +211,20 @@ def estimate_propensity_ratio_sieve( k_max = max(k_max, 1) # Penalty multiplier for IC + # BIC penalty uses observation count (not weighted) — complexity vs distinct obs n_total = int(np.sum(mask_g)) + n_gp c_n = 2.0 if criterion == "aic" else np.log(max(n_total, 2)) + # Weighted totals for loss normalization (raw probability weights) + if unit_weights is not None: + w_g = unit_weights[mask_g] + w_gp = unit_weights[mask_gp] + n_total_w = float(np.sum(w_g)) + float(np.sum(w_gp)) + else: + w_g = None + w_gp = None + n_total_w = float(n_total) + best_ic = np.inf best_ratio = np.ones(n_units) # fallback: constant ratio 1 @@ -214,9 +239,15 @@ def estimate_propensity_ratio_sieve( Psi_gp = basis_all[mask_gp] # (n_gp, n_basis) Psi_g = basis_all[mask_g] # (n_g, n_basis) - # Normal equations: (Psi_gp' Psi_gp) beta = Psi_g.sum(axis=0) - A = Psi_gp.T @ Psi_gp - b = Psi_g.sum(axis=0) + # Normal equations (weighted when survey weights present): + # Unweighted: (Psi_gp' Psi_gp) beta = Psi_g.sum(axis=0) + # Weighted: (Psi_gp' W_gp Psi_gp) beta = (w_g * Psi_g).sum(axis=0) + if w_gp is not None: + A = Psi_gp.T @ (w_gp[:, None] * Psi_gp) + b = (w_g[:, None] * Psi_g).sum(axis=0) + else: + A = Psi_gp.T @ Psi_gp + b = Psi_g.sum(axis=0) try: beta = np.linalg.solve(A, b) @@ -230,11 +261,12 @@ def estimate_propensity_ratio_sieve( # Predicted ratio for all units r_hat = basis_all @ beta - # IC selection: loss at optimum = -(1/n) * b'beta - # Derivation: L(beta) = (1/n)(beta'A*beta - 2*b'beta). + # IC selection: loss at optimum = -(1/n_w) * b'beta + # Derivation: L(beta) = (1/n_w)(beta'A*beta - 2*b'beta). # At optimum A*beta = b, so beta'A*beta = b'beta. - # Therefore L = (1/n)(b'beta - 2*b'beta) = -(1/n)*b'beta. - loss = -float(b @ beta) / n_total + # Therefore L = (1/n_w)(b'beta - 2*b'beta) = -(1/n_w)*b'beta. + # Loss uses weighted totals; BIC penalty uses observation count. + loss = -float(b @ beta) / n_total_w ic_val = 2.0 * loss + c_n * n_basis / n_total if ic_val < best_ic: @@ -280,6 +312,7 @@ def estimate_inverse_propensity_sieve( group_mask: np.ndarray, k_max: Optional[int] = None, criterion: str = "bic", + unit_weights: Optional[np.ndarray] = None, ) -> np.ndarray: r"""Estimate s_{g'}(X) = 1/p_{g'}(X) via sieve convex minimization. @@ -305,6 +338,9 @@ def estimate_inverse_propensity_sieve( Maximum polynomial degree. None = auto. criterion : str ``"aic"`` or ``"bic"``. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, uses weighted + normal equations for the sieve estimation. Returns ------- @@ -322,10 +358,25 @@ def estimate_inverse_propensity_sieve( k_max = min(int(n_group**0.2), 5) k_max = max(k_max, 1) + # BIC penalty uses observation count (not weighted) c_n = 2.0 if criterion == "aic" else np.log(max(n_units, 2)) + # Weighted loss normalization and fallback + if unit_weights is not None: + w_group = unit_weights[group_mask] + sum_w_group = float(np.sum(w_group)) + if sum_w_group <= 0: + # Zero survey weight for this group — return unconditional fallback + return np.ones(n_units) + n_units_w = float(np.sum(unit_weights)) + fallback_ratio = n_units_w / sum_w_group + else: + w_group = None + n_units_w = float(n_units) + fallback_ratio = n_units / n_group + best_ic = np.inf - best_s = np.full(n_units, n_units / n_group) # fallback: unconditional + best_s = np.full(n_units, fallback_ratio) # fallback: unconditional for K in range(1, k_max + 1): n_basis = comb(K + d, d) @@ -335,9 +386,16 @@ def estimate_inverse_propensity_sieve( basis_all = _polynomial_sieve_basis(covariate_matrix, K) Psi_gp = basis_all[group_mask] - A = Psi_gp.T @ Psi_gp - # RHS: sum of basis over ALL units (not just one group) - b = basis_all.sum(axis=0) + # Normal equations (weighted when survey weights present): + # Unweighted: (Psi_gp' Psi_gp) beta = Psi_all.sum(axis=0) + # Weighted: (Psi_gp' W_group Psi_gp) beta = (w_all * Psi_all).sum(axis=0) + if w_group is not None: + A = Psi_gp.T @ (w_group[:, None] * Psi_gp) + b = (unit_weights[:, None] * basis_all).sum(axis=0) + else: + A = Psi_gp.T @ Psi_gp + # RHS: sum of basis over ALL units (not just one group) + b = basis_all.sum(axis=0) try: beta = np.linalg.solve(A, b) @@ -348,8 +406,9 @@ def estimate_inverse_propensity_sieve( s_hat = basis_all @ beta - # IC: loss = -(1/n) * b'beta (same derivation as ratio estimator) - loss = -float(b @ beta) / n_units + # IC: loss = -(1/n_w) * b'beta (same derivation as ratio estimator) + # Loss uses weighted totals; BIC penalty uses observation count. + loss = -float(b @ beta) / n_units_w ic_val = 2.0 * loss + c_n * n_basis / n_units if ic_val < best_ic: @@ -433,6 +492,10 @@ def compute_generated_outcomes_cov( g_mask = cohort_masks[target_g] pi_g = cohort_fractions[target_g] + # Guard: zero survey weight for the target cohort → no DR estimation possible + if pi_g <= 0: + return np.zeros((n_units, H)) + gen_out = np.zeros((n_units, H)) for j, (gp, tpre) in enumerate(valid_pairs): @@ -496,6 +559,7 @@ def _kernel_weights_matrix( X_all: np.ndarray, X_group: np.ndarray, bandwidth: float, + group_weights: Optional[np.ndarray] = None, ) -> np.ndarray: """Gaussian kernel weight matrix. @@ -503,11 +567,21 @@ def _kernel_weights_matrix( normalized kernel weight ``K_h(X_group[j], X_all[i])``. Each row sums to 1 (Nadaraya-Watson normalization). + + Parameters + ---------- + group_weights : ndarray, shape (n_group,), optional + Survey weights for the group units. When provided, kernel + weights are multiplied by survey weights before row-normalization, + making the Nadaraya-Watson estimator survey-weighted. """ # Squared distances: (n_all, n_group) dist_sq = cdist(X_all, X_group, metric="sqeuclidean") # Gaussian kernel raw = np.exp(-dist_sq / (2.0 * bandwidth**2)) + # Survey-weight: each group unit j contributes ∝ w_j * K_h(X_i, X_j) + if group_weights is not None: + raw = raw * group_weights[np.newaxis, :] # Normalize each row row_sums = raw.sum(axis=1, keepdims=True) row_sums[row_sums < 1e-15] = 1.0 # avoid division by zero @@ -559,6 +633,7 @@ def compute_omega_star_conditional( covariate_matrix: np.ndarray, s_hat_cache: Dict[float, np.ndarray], bandwidth: Optional[float] = None, + unit_weights: Optional[np.ndarray] = None, never_treated_val: float = np.inf, ) -> np.ndarray: r"""Kernel-smoothed conditional Omega\*(X_i) for each unit (Eq 3.12). @@ -583,6 +658,9 @@ def compute_omega_star_conditional( value is shape ``(n_units,)``. Keyed by group identifier. bandwidth : float or None Kernel bandwidth. None = Silverman's rule. + unit_weights : ndarray, shape (n_units,), optional + Survey weights at the unit level. When provided, kernel-smoothed + covariances use survey-weighted Nadaraya-Watson regression. never_treated_val : float Returns @@ -622,13 +700,17 @@ def compute_omega_star_conditional( stacklevel=2, ) + # Per-group survey weights for kernel smoothing + w_g = unit_weights[g_mask] if unit_weights is not None else None + w_inf = unit_weights[never_treated_mask] if unit_weights is not None else None + # Pre-compute kernel weight matrices per group Y_g = outcome_wide[g_mask] X_g = covariate_matrix[g_mask] Yg_t_minus_1 = Y_g[:, t_col] - Y_g[:, y1_col] - W_g = _kernel_weights_matrix(covariate_matrix, X_g, bandwidth) - W_inf = _kernel_weights_matrix(covariate_matrix, X_inf, bandwidth) + W_g = _kernel_weights_matrix(covariate_matrix, X_g, bandwidth, group_weights=w_g) + W_inf = _kernel_weights_matrix(covariate_matrix, X_inf, bandwidth, group_weights=w_inf) inf_t_minus_tpre = {} for _, tpre in valid_pairs: @@ -683,7 +765,10 @@ def compute_omega_star_conditional( ) if gp_j not in W_gp_cache: X_gp = covariate_matrix[cohort_masks[gp_j]] - W_gp_cache[gp_j] = _kernel_weights_matrix(covariate_matrix, X_gp, bandwidth) + w_gp_j = unit_weights[cohort_masks[gp_j]] if unit_weights is not None else None + W_gp_cache[gp_j] = _kernel_weights_matrix( + covariate_matrix, X_gp, bandwidth, group_weights=w_gp_j + ) gp_outcomes_cache[gp_j] = outcome_wide[cohort_masks[gp_j]] W_gp = W_gp_cache[gp_j] Y_gp = gp_outcomes_cache[gp_j] diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 77d85feb..ff669d44 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -703,7 +703,7 @@ where `q_{g,e} = pi_g / sum_{g' in G_{trt,e}} pi_{g'}`. - **Note:** Conditional covariance Omega*(X) scales each term by per-unit sieve-estimated inverse propensities s_hat_{g'}(X) = 1/p_{g'}(X) (algorithm step 4), matching Eq 3.12. The inverse propensity estimation uses the same polynomial sieve convex minimization as the ratio estimator. Estimated s_hat values are clipped to [1, n] with a UserWarning when clipping binds, mirroring the ratio path's overlap diagnostics. - **Note:** Outcome regressions m_hat_{g',t,tpre}(X) use linear OLS working models. The paper's Section 4 describes flexible nonparametric nuisance estimation (sieve regression, kernel smoothing, or ML methods). The DR property ensures consistency if either the OLS outcome model or the sieve propensity ratio is correctly specified, but the linear OLS specification does not generically guarantee attainment of the semiparametric efficiency bound unless the conditional mean is linear in the covariates. - **Note:** EfficientDiD bootstrap with survey weights supported (Phase 6) via PSU-level multiplier weights -- **Note:** EfficientDiD covariates (DR path) with survey weights deferred — the doubly robust nuisance estimation does not yet thread survey weights through sieve/kernel steps +- **Note:** EfficientDiD covariates (DR path) with survey weights supported — WLS outcome regression, weighted sieve normal equations for propensity ratios/inverse propensities, survey-weighted Nadaraya-Watson kernel for conditional Omega*(X), and survey-weighted ATT averaging. Silverman bandwidth uses unweighted statistics (survey-weighted bandwidth deferred as second-order refinement). - **Note:** Cluster-robust SEs use the standard Liang-Zeger clustered sandwich estimator applied to EIF values: aggregate EIF within clusters, center, and compute variance with G/(G-1) small-sample correction. Cluster bootstrap generates multiplier weights at the cluster level (all units in a cluster share the same weight). Analytical clustered SEs are the default when `cluster` is set; cluster bootstrap is opt-in via `n_bootstrap > 0`. - **Note:** Hausman pretest operates on the post-treatment event-study vector ES(e) per Theorem A.1. Both PT-All and PT-Post fits are aggregated to ES(e) using cohort-size weights before computing the test statistic H = delta' V^{-1} delta where delta = ES_post - ES_all and V = Cov(ES_post) - Cov(ES_all). Covariance is computed from aggregated ES(e)-level EIF values. The variance-difference matrix V is inverted via Moore-Penrose pseudoinverse to handle finite-sample non-positive-definiteness. Effective rank of V (number of positive eigenvalues) is used as degrees of freedom. - **Note:** Last-cohort-as-control (`control_group="last_cohort"`) reclassifies the latest treatment cohort as pseudo-never-treated and drops time periods at/after that cohort's treatment start. This is distinct from CallawaySantAnna's `not_yet_treated` option which dynamically selects not-yet-treated units per (g,t) pair. diff --git a/docs/survey-roadmap.md b/docs/survey-roadmap.md index 56311734..6992be62 100644 --- a/docs/survey-roadmap.md +++ b/docs/survey-roadmap.md @@ -36,7 +36,7 @@ Phase 5 infrastructure (bootstrap+survey interaction): | SunAbraham | Pairs bootstrap + survey | **Resolved** (Phase 6, Rao-Wu rescaled) | | ContinuousDiD | Multiplier bootstrap + survey | **Resolved** (Phase 6, multiplier at PSU) | | EfficientDiD | Multiplier bootstrap + survey | **Resolved** (Phase 6, multiplier at PSU) | -| EfficientDiD | Covariates (DR path) + survey | DR nuisance estimation needs survey weight threading | +| EfficientDiD | Covariates (DR path) + survey | **Resolved**: survey weights threaded through DR nuisance estimation | All blocked combinations raise `NotImplementedError` when attempted, with a message pointing to the planned phase or describing the limitation. @@ -302,7 +302,6 @@ multi-absorb + survey weights). | Limitation | Reason | |-----------|--------| -| `covariates` + `survey_design` | DR nuisance path doesn't thread survey weights | | `cluster` + `survey_design` | Use `survey_design` with PSU/strata instead | ### Bootstrap + Replicate Weights (Mutual Exclusion) diff --git a/docs/tutorials/16_survey_did.ipynb b/docs/tutorials/16_survey_did.ipynb index aa1ce756..c8d056df 100644 --- a/docs/tutorials/16_survey_did.ipynb +++ b/docs/tutorials/16_survey_did.ipynb @@ -95,7 +95,7 @@ "**About the normalization warning:** You'll see `pweight weights normalized to mean=1` throughout this tutorial. ", "Survey weights are inverse selection probabilities -- they rarely have mean=1 out of the box. ", "The library rescales them internally so that weighted estimators are numerically stable. ", - "This is standard practice (Lumley 2004, §2.2). ", + "This is standard practice (Lumley 2004, \u00a72.2). ", "The warning confirms rescaling occurred; it is not an error." ] }, @@ -356,9 +356,8 @@ "- **JKn** (Jackknife delete-n): Stratified jackknife, drops one PSU per stratum.\n", "- **BRR** (Balanced Repeated Replication): Halve each stratum, reweight.\n", "- **Fay's BRR**: Modified BRR with a damping factor (0 < rho < 1).\n", - "- **SDR** (Successive Difference Replication): Used by ACS PUMS (80 replicate columns). Variance: V = 4/R × Σ(θᵣ − θ)².\n", "\n", - "`SurveyDesign` accepts replicate weights as an alternative to strata/PSU/FPC. They are **mutually exclusive** -- use one or the other.\n" + "`SurveyDesign` accepts replicate weights as an alternative to strata/PSU/FPC. They are **mutually exclusive** -- use one or the other." ] }, { @@ -531,58 +530,45 @@ "source": [ "## 9. Which Estimators Support Survey Design?\n", "\n", - "All estimators accept `survey_design` in `fit()`. Support depth varies — see the\n", - "[Survey Design Support](https://diff-diff.readthedocs.io/en/latest/choosing_estimator.html#survey-design-support)\n", - "table in the Choosing an Estimator guide for the full compatibility matrix.\n", - "\n", - "**Key highlights:**\n", - "- **CallawaySantAnna** has the most complete support: all design elements, replicate weights, analytical and bootstrap SEs, panel and cross-section modes\n", - "- **12 of 15 estimators** support replicate weights (BRR, Fay, JK1, JKn, SDR)\n", - "- **SyntheticDiD** and **TROP** support strata/PSU/FPC via bootstrap only (not placebo/analytical)\n" + "`diff-diff` supports survey design across all estimators, though the level of support varies:\n", + "\n", + "| Estimator | Weights | Strata/PSU/FPC (TSL) | Replicate Weights | Survey-Aware Bootstrap |\n", + "|-----------|---------|---------------------|-------------------|------------------------|\n", + "| **DifferenceInDifferences** | Full | Full | -- | -- |\n", + "| **TwoWayFixedEffects** | Full | Full | -- | -- |\n", + "| **MultiPeriodDiD** | Full | Full | -- | -- |\n", + "| **CallawaySantAnna** | pweight only | Full | Full | Multiplier at PSU |\n", + "| **TripleDifference** | pweight only | Full | Full (analytical) | -- |\n", + "| **StaggeredTripleDifference** | pweight only | Full | Full | Multiplier at PSU |\n", + "| **SunAbraham** | Full | Full | -- | Rao-Wu rescaled |\n", + "| **StackedDiD** | pweight only | Full (pweight only) | -- | -- |\n", + "| **ImputationDiD** | pweight only | Partial (no FPC) | -- | Multiplier at PSU |\n", + "| **TwoStageDiD** | pweight only | Partial (no FPC) | -- | Multiplier at PSU |\n", + "| **ContinuousDiD** | Full | Full | Full (analytical) | Multiplier at PSU |\n", + "| **EfficientDiD** | Full | Full | Full (analytical) | Multiplier at PSU |\n", + "| **SyntheticDiD** | pweight only | -- | -- | Rao-Wu rescaled |\n", + "| **TROP** | pweight only | -- | -- | Rao-Wu rescaled |\n", + "| **BaconDecomposition** | Diagnostic | Diagnostic | -- | -- |\n", + "\n", + "**Legend:**\n", + "- **Full**: All weight types (pweight/fweight/aweight) + strata/PSU/FPC + Taylor Series Linearization variance\n", + "- **Full (pweight only)**: Full TSL support with strata/PSU/FPC, but only accepts `pweight` weight type (`fweight`/`aweight` rejected because Q-weight composition changes their semantics)\n", + "- **Partial (no FPC)**: Weights + strata (for df) + PSU (for clustering); FPC raises `NotImplementedError`\n", + "- **pweight only** (Weights column): Only `pweight` accepted; `fweight`/`aweight` raise an error\n", + "- **pweight only** (TSL column): Sampling weights for point estimates; no strata/PSU/FPC design elements\n", + "- **Diagnostic**: Weighted descriptive statistics only (no inference)\n", + "- **--**: Not supported\n", + "\n", + "**Note:** `EfficientDiD` supports `covariates` and `survey_design` simultaneously. The doubly-robust (DR) path threads survey weights through WLS outcome regression, weighted sieve propensity ratios, and survey-weighted kernel smoothing.\n", + "\n", + "For full details, see `docs/survey-roadmap.md`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Summary\n", - "\n", - "**Key takeaways:**\n", - "\n", - "1. **Always specify the survey design** when working with survey data. Ignoring it leads to incorrect standard errors -- typically too small, leading to false positives.\n", - "\n", - "2. **`SurveyDesign`** encapsulates your survey's sampling structure in one object. Pass column names for weights, strata, PSU, and FPC.\n", - "\n", - "3. **Pass `survey_design` to `fit()`** -- the same API works across all estimators. No changes to your estimation code beyond adding one parameter.\n", - "\n", - "4. **CallawaySantAnna** has the most complete survey support: strata/PSU/FPC, replicate weights, analytical and bootstrap SEs, and both panel and cross-section modes.\n", - "\n", - "5. **Replicate weights** (JK1, JKn, BRR, Fay, SDR) are an alternative to strata/PSU/FPC when your survey provides them (e.g., MEPS, ACS PUMS). They are mutually exclusive with strata/PSU/FPC.\n", - "\n", - "6. **Use `subpopulation()`** instead of subsetting when estimating effects for a subgroup. Subsetting drops design information and biases variance estimates.\n", - "\n", - "7. **DEFF diagnostics** help you understand *how* the survey design affects precision. DEFF > 1 means clustering costs exceed stratification gains; DEFF < 1 means the design improves precision for that coefficient. DEFF > 2 indicates substantial clustering.\n", - "\n", - "8. **Repeated cross-sections** (`panel=False`) work with survey design for non-panel surveys like BRFSS, CPS, and ACS 1-year.\n", - "\n", - "**Quick reference:**\n", - "\n", - "| Parameter | When to use |\n", - "|-----------|------------|\n", - "| `weights` | Always -- specify the sampling weight column |\n", - "| `strata` | When the survey uses stratified sampling |\n", - "| `psu` | When multi-stage (clustered) sampling is used |\n", - "| `fpc` | When the sampling fraction is non-negligible |\n", - "| `replicate_weights` | When the survey provides replicate weights instead of strata/PSU/FPC |\n", - "\n", - "**References:**\n", - "\n", - "- Lumley, T. (2004). Analysis of Complex Survey Samples. *Journal of Statistical Software* 9(8).\n", - "- Solon, G., Haider, S. J., & Wooldridge, J. M. (2015). What Are We Weighting For? *Journal of Human Resources* 50(2).\n", - "- Binder, D. A. (1983). On the Variances of Asymptotically Normal Estimators from Complex Surveys. *International Statistical Review* 51(3).\n", - "- Rao, J. N. K., Wu, C. F. J., & Yue, K. (1992). Some Recent Work on Resampling Methods for Complex Surveys. *Survey Methodology* 18(2).\n", - "- Callaway, B. & Sant'Anna, P. H. C. (2021). Difference-in-Differences with Multiple Time Periods. *Journal of Econometrics* 225(2).\n", - "- Sant'Anna, P. H. C. & Zhao, J. (2020). Doubly Robust Difference-in-Differences Estimators. *Journal of Econometrics* 219(1).\n" + "## Summary\n\n**Key takeaways:**\n\n1. **Always specify the survey design** when working with survey data. Ignoring it leads to incorrect standard errors -- typically too small, leading to false positives.\n\n2. **`SurveyDesign`** encapsulates your survey's sampling structure in one object. Pass column names for weights, strata, PSU, and FPC.\n\n3. **Pass `survey_design` to `fit()`** -- the same API works across all estimators. No changes to your estimation code beyond adding one parameter.\n\n4. **CallawaySantAnna** has the most complete survey support: strata/PSU/FPC, replicate weights, analytical and bootstrap SEs, and both panel and cross-section modes.\n\n5. **Replicate weights** (JK1, JKn, BRR, Fay) are an alternative to strata/PSU/FPC when your survey provides them (e.g., MEPS, ACS PUMS). They are mutually exclusive with strata/PSU/FPC.\n\n6. **Use `subpopulation()`** instead of subsetting when estimating effects for a subgroup. Subsetting drops design information and biases variance estimates.\n\n7. **DEFF diagnostics** help you understand *how* the survey design affects precision. DEFF > 1 means clustering costs exceed stratification gains; DEFF < 1 means the design improves precision for that coefficient. DEFF > 2 indicates substantial clustering.\n\n8. **Repeated cross-sections** (`panel=False`) work with survey design for non-panel surveys like BRFSS, CPS, and ACS 1-year.\n\n**Quick reference:**\n\n| Parameter | When to use |\n|-----------|------------|\n| `weights` | Always -- specify the sampling weight column |\n| `strata` | When the survey uses stratified sampling |\n| `psu` | When multi-stage (clustered) sampling is used |\n| `fpc` | When the sampling fraction is non-negligible |\n| `replicate_weights` | When the survey provides replicate weights instead of strata/PSU/FPC |\n\n**References:**\n\n- Lumley, T. (2004). Analysis of Complex Survey Samples. *Journal of Statistical Software* 9(8).\n- Solon, G., Haider, S. J., & Wooldridge, J. M. (2015). What Are We Weighting For? *Journal of Human Resources* 50(2).\n- Binder, D. A. (1983). On the Variances of Asymptotically Normal Estimators from Complex Surveys. *International Statistical Review* 51(3).\n- Rao, J. N. K., Wu, C. F. J., & Yue, K. (1992). Some Recent Work on Resampling Methods for Complex Surveys. *Survey Methodology* 18(2).\n- Callaway, B. & Sant'Anna, P. H. C. (2021). Difference-in-Differences with Multiple Time Periods. *Journal of Econometrics* 225(2).\n- Sant'Anna, P. H. C. & Zhao, J. (2020). Doubly Robust Difference-in-Differences Estimators. *Journal of Econometrics* 219(1)." ] } ], @@ -593,4 +579,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} +} \ No newline at end of file diff --git a/tests/test_survey_phase3.py b/tests/test_survey_phase3.py index f43c4968..fbac504b 100644 --- a/tests/test_survey_phase3.py +++ b/tests/test_survey_phase3.py @@ -745,23 +745,29 @@ def test_bootstrap_survey_supported(self, staggered_survey_data): assert np.isfinite(result.overall_att) assert np.isfinite(result.overall_se) - def test_covariates_survey_raises(self, staggered_survey_data): - """Covariates + survey should raise NotImplementedError.""" + def test_covariates_survey_works(self, staggered_survey_data): + """Covariates + survey should produce finite results via DR path.""" from diff_diff import EfficientDiD - # Add a covariate column - staggered_survey_data["x1"] = np.random.randn(len(staggered_survey_data)) + # Add a time-invariant covariate + np.random.seed(123) + unit_vals = {u: np.random.randn() for u in staggered_survey_data["unit"].unique()} + staggered_survey_data["x1"] = staggered_survey_data["unit"].map(unit_vals) sd = SurveyDesign(weights="weight") - with pytest.raises(NotImplementedError, match="covariates"): - EfficientDiD(n_bootstrap=0).fit( - staggered_survey_data, - "outcome", - "unit", - "time", - "first_treat", - covariates=["x1"], - survey_design=sd, - ) + result = EfficientDiD(n_bootstrap=0).fit( + staggered_survey_data, + "outcome", + "unit", + "time", + "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + assert result.estimation_path == "dr" + assert result.survey_metadata is not None def test_survey_metadata_fields(self, staggered_survey_data): """survey_metadata has correct fields.""" @@ -864,6 +870,323 @@ def test_survey_all_aggregation(self, staggered_survey_data): assert np.isfinite(result.overall_se) +# ============================================================================= +# EfficientDiD Covariates + Survey +# ============================================================================= + + +class TestEfficientDiDCovSurvey: + """Survey design support for EfficientDiD covariates (DR) path.""" + + @pytest.fixture + def cov_survey_data(self): + """Staggered panel with time-invariant covariates and survey columns.""" + np.random.seed(42) + n_units = 60 + n_periods = 8 + rows = [] + # Assign a time-invariant covariate per unit + unit_x1 = {u: np.random.randn() for u in range(n_units)} + for unit in range(n_units): + if unit < 20: + ft = 4 + elif unit < 40: + ft = 6 + else: + ft = 0 + + stratum = unit // 12 + psu = unit // 5 + fpc_val = 120.0 + wt = 1.0 + 0.3 * stratum + + for t in range(1, n_periods + 1): + y = 10.0 + unit * 0.05 + t * 0.2 + 0.5 * unit_x1[unit] + if ft > 0 and t >= ft: + y += 2.0 + y += np.random.normal(0, 0.5) + + rows.append( + { + "unit": unit, + "time": t, + "first_treat": ft, + "outcome": y, + "weight": wt, + "stratum": stratum, + "psu": psu, + "fpc": fpc_val, + "x1": unit_x1[unit], + } + ) + return pd.DataFrame(rows) + + def test_uniform_weight_equivalence(self, cov_survey_data): + """Covariates + uniform survey weights ≈ covariates without survey.""" + from diff_diff import EfficientDiD + + # Set all weights to 1.0 + cov_survey_data["weight"] = 1.0 + sd = SurveyDesign(weights="weight") + + result_survey = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_nosurv = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + ) + assert result_survey.estimation_path == "dr" + assert result_nosurv.estimation_path == "dr" + np.testing.assert_allclose( + result_survey.overall_att, result_nosurv.overall_att, atol=1e-8 + ) + + def test_scale_invariance(self, cov_survey_data): + """Multiplying all weights by a constant doesn't change ATT.""" + from diff_diff import EfficientDiD + + sd1 = SurveyDesign(weights="weight") + result1 = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd1, + ) + + cov_survey_data["weight_scaled"] = cov_survey_data["weight"] * 5.0 + sd2 = SurveyDesign(weights="weight_scaled") + result2 = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd2, + ) + np.testing.assert_allclose( + result1.overall_att, result2.overall_att, atol=1e-8 + ) + + def test_nontrivial_weight_effect(self, cov_survey_data): + """Heterogeneous weights produce different ATT from unweighted.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result_survey = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_nosurv = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + ) + # Non-uniform weights (1.0 + 0.3*stratum) should produce different ATT + assert abs(result_survey.overall_att - result_nosurv.overall_att) > 1e-6 + + def test_full_design_smoke(self, cov_survey_data): + """Strata + PSU + FPC + covariates produces finite results.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign( + weights="weight", strata="stratum", psu="psu", fpc="fpc", + nest=True, + ) + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + assert result.estimation_path == "dr" + + def test_aggregation_with_survey(self, cov_survey_data): + """Event study and group aggregation work with covariates + survey.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + aggregate="all", + survey_design=sd, + ) + assert result.event_study_effects is not None + assert result.group_effects is not None + for _, eff in result.event_study_effects.items(): + assert np.isfinite(eff["effect"]) + assert np.isfinite(eff["se"]) + for _, eff in result.group_effects.items(): + assert np.isfinite(eff["effect"]) + assert np.isfinite(eff["se"]) + + def test_bootstrap_covariates_survey(self, cov_survey_data): + """Bootstrap + covariates + survey produces finite results.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result = EfficientDiD(n_bootstrap=30, seed=42).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + + def test_analytical_se_differs_from_unweighted(self, cov_survey_data): + """Survey analytical SE should differ from unweighted SE.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result_survey = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_nosurv = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + ) + # Non-uniform weights (1.0 + 0.3*stratum) should produce different SEs + assert result_survey.overall_se != result_nosurv.overall_se + assert np.isfinite(result_survey.overall_se) + assert result_survey.overall_se > 0 + + def test_bootstrap_se_in_ballpark_of_analytical(self, cov_survey_data): + """Bootstrap SE should be in same ballpark as analytical SE.""" + from diff_diff import EfficientDiD + + sd = SurveyDesign(weights="weight") + result_analytical = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + result_boot = EfficientDiD(n_bootstrap=199, seed=42).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + ratio = result_boot.overall_se / result_analytical.overall_se + assert 0.3 < ratio < 3.0, ( + f"Bootstrap/analytical SE ratio {ratio:.2f} outside [0.3, 3.0]" + ) + + def test_zero_weight_cohort_skipped(self, cov_survey_data): + """Zero-weight treated cohort should be skipped with a warning.""" + from diff_diff import EfficientDiD + + # Set early cohort (first_treat=4) weights to exactly zero + cov_survey_data = cov_survey_data.copy() + cov_survey_data.loc[cov_survey_data["first_treat"] == 4, "weight"] = 0.0 + sd = SurveyDesign(weights="weight") + with pytest.warns(UserWarning, match="zero survey weight"): + result = EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + + def test_zero_weight_never_treated_raises(self, cov_survey_data): + """Zero-weight never-treated group should raise ValueError (DR path).""" + from diff_diff import EfficientDiD + + cov_survey_data = cov_survey_data.copy() + cov_survey_data.loc[cov_survey_data["first_treat"] == 0, "weight"] = 0.0 + sd = SurveyDesign(weights="weight") + with pytest.raises(ValueError, match="zero survey weight"): + EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + covariates=["x1"], + survey_design=sd, + ) + + def test_zero_weight_never_treated_nocov_raises(self, cov_survey_data): + """Zero-weight never-treated group should raise ValueError (nocov path).""" + from diff_diff import EfficientDiD + + cov_survey_data = cov_survey_data.copy() + cov_survey_data.loc[cov_survey_data["first_treat"] == 0, "weight"] = 0.0 + sd = SurveyDesign(weights="weight") + with pytest.raises(ValueError, match="zero survey weight"): + EfficientDiD(n_bootstrap=0).fit( + cov_survey_data, + "outcome", "unit", "time", "first_treat", + survey_design=sd, + ) + + def test_replicate_weight_aggregation(self): + """Replicate-weight aggregation SEs use compute_replicate_if_variance.""" + from diff_diff import EfficientDiD + + # Reuse the fixture from test_survey_phase6 + np.random.seed(42) + n_units, n_periods, n_rep = 60, 6, 15 + # Generate per-unit replicate weights (constant within unit across time) + unit_repwts = {} + for unit in range(n_units): + wt = 1.0 + 0.3 * (unit % 5) + unit_repwts[unit] = { + f"repwt_{r}": wt * (0.5 + np.random.random()) + for r in range(n_rep) + } + rows = [] + for unit in range(n_units): + ft = 3 if unit < 20 else (5 if unit < 40 else 0) + wt = 1.0 + 0.3 * (unit % 5) + x1 = np.random.randn() + for t in range(1, n_periods + 1): + y = 10.0 + unit * 0.05 + t * 0.2 + 0.5 * x1 + if ft > 0 and t >= ft: + y += 2.0 + y += np.random.normal(0, 0.5) + row = {"unit": unit, "time": t, "first_treat": ft, + "outcome": y, "weight": wt, "x1": x1} + row.update(unit_repwts[unit]) + rows.append(row) + import pandas as pd + data = pd.DataFrame(rows) + rep_cols = [f"repwt_{r}" for r in range(n_rep)] + + sd = SurveyDesign( + weights="weight", replicate_weights=rep_cols, + replicate_method="JK1", + ) + result = EfficientDiD(n_bootstrap=0).fit( + data, "outcome", "unit", "time", "first_treat", + covariates=["x1"], + aggregate="event_study", + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + assert result.event_study_effects is not None + for _, eff in result.event_study_effects.items(): + assert np.isfinite(eff["effect"]) + assert np.isfinite(eff["se"]) + assert eff["se"] > 0 + + # ============================================================================= # Scale Invariance (applies to all estimators) # =============================================================================