diff --git a/openquake/hme/model_test_frameworks/gem/gem_test_functions.py b/openquake/hme/model_test_frameworks/gem/gem_test_functions.py index e41f902..60f0fd4 100644 --- a/openquake/hme/model_test_frameworks/gem/gem_test_functions.py +++ b/openquake/hme/model_test_frameworks/gem/gem_test_functions.py @@ -27,6 +27,7 @@ ) from openquake.hme.utils.utils import _n_procs, breakpoint from openquake.hme.utils.stats import geom_mean, weighted_geom_mean +from scipy.stats import poisson, nbinom, t as t_dist def get_rupture_gdf_cell_moment(rupture_gdf, t_yrs, rup_groups=None): @@ -725,3 +726,148 @@ def catalog_ground_motion_eval_fn(test_config, input_data): logging.warning(f"can't do eq {eq.name}") return gmm_results + + +def N_test_annual( + eq_gdf: GeoDataFrame, + mean_annual_model_rate: float, + conf_interval: float, + test_method: str = "ecdf" +) -> dict: + """ + N-test using annual earthquake count distribution. + + Tests if the model's mean annual rate is consistent with the observed + annual earthquake counts, accounting for temporal variability without + assuming a Poisson process. + + If the data shows overdispersion (variance > mean), estimates a negative + binomial dispersion parameter for visualization and reporting. + + Parameters + ---------- + eq_gdf : GeoDataFrame + Earthquake geodataframe with observed earthquakes + mean_annual_model_rate : float + Mean annual rate from the model + conf_interval : float + Confidence interval for the test (e.g., 0.95 for 95%) + test_method : str, optional + Method for calculating confidence intervals: + - "ecdf": Use empirical percentiles from observed distribution (default) + - "t_distribution": Use t-distribution confidence interval + + Returns + ------- + dict + Test results including annual counts, dispersion statistics, and + confidence intervals + """ + # Extract years from earthquake times + eq_years = pd.DatetimeIndex(eq_gdf['time']).year + unique_years = sorted(eq_years.unique()) + n_years = len(unique_years) + + # Count earthquakes per year + annual_counts = np.array([ + (eq_years == year).sum() for year in unique_years + ]) + + # Mean and standard deviation of annual counts + mean_annual_obs = annual_counts.mean() + std_annual_obs = annual_counts.std(ddof=1) + + # Calculate variance for dispersion estimation + var_annual_obs = annual_counts.var(ddof=1) + + # Estimate dispersion parameter r for negative binomial + # r = mu^2 / (var - mu) + # If var <= mu, data is not overdispersed (Poisson-like or underdispersed) + if var_annual_obs > mean_annual_obs: + r_dispersion = mean_annual_obs**2 / (var_annual_obs - mean_annual_obs) + is_overdispersed = True + logging.info( + f"N-Test (annual, {test_method}): Observed overdispersion - " + f"mean={mean_annual_obs:.2f}, var={var_annual_obs:.2f}, r={r_dispersion:.2f}" + ) + else: + r_dispersion = None + is_overdispersed = False + logging.info( + f"N-Test (annual, {test_method}): No overdispersion detected - " + f"mean={mean_annual_obs:.2f}, var={var_annual_obs:.2f}" + ) + + # Calculate confidence interval based on test_method + if test_method == "ecdf": + # Use empirical percentiles from observed distribution + alpha = 1 - conf_interval + lower_percentile = (alpha / 2) * 100 + upper_percentile = (1 - alpha / 2) * 100 + + conf_min = np.percentile(annual_counts, lower_percentile) + conf_max = np.percentile(annual_counts, upper_percentile) + + # Calculate what percentile the model rate corresponds to + model_percentile = (annual_counts < mean_annual_model_rate).sum() / n_years * 100 + + logging.info( + f"N-Test (annual, ECDF): Model rate {mean_annual_model_rate:.2f}, " + f"Observed {conf_interval*100:.0f}% CI: [{conf_min:.2f}, {conf_max:.2f}] " + f"(from {lower_percentile:.1f}th to {upper_percentile:.1f}th percentile)" + ) + logging.info( + f"N-Test (annual, ECDF): Model rate at {model_percentile:.1f}th percentile " + f"of observed distribution" + ) + else: + # Use t-distribution confidence interval (doesn't assume Poisson) + t_crit = t_dist.ppf((1 + conf_interval) / 2, n_years - 1) + margin_error = t_crit * (std_annual_obs / np.sqrt(n_years)) + + conf_min = mean_annual_obs - margin_error + conf_max = mean_annual_obs + margin_error + + logging.info( + f"N-Test (annual, t-distribution): Model rate {mean_annual_model_rate:.2f}, " + f"Observed mean {mean_annual_obs:.2f} ± {margin_error:.2f} " + f"({conf_interval*100:.0f}% CI: [{conf_min:.2f}, {conf_max:.2f}])" + ) + + # Test passes if model mean is within CI + test_pass = conf_min <= mean_annual_model_rate <= conf_max + + test_res = "Pass" if test_pass else "Fail" + logging.info(f"N-Test (annual, {test_method}): {test_res}") + + test_result = { + "conf_interval_frac": conf_interval, + "conf_interval": (conf_min, conf_max), + "n_pred_earthquakes": mean_annual_model_rate, # annual rate + "n_obs_earthquakes": mean_annual_obs, # mean annual count + "test_res": test_res, + "test_pass": bool(test_pass), + # Test method + "test_method": test_method, + # Temporal variability statistics + "is_overdispersed": is_overdispersed, + "r_dispersion": float(r_dispersion) if r_dispersion is not None else None, + "variance_annual_obs": float(var_annual_obs), + # Additional data for annual distribution plotting + "annual_counts": annual_counts.tolist(), + "years": unique_years, + "n_years": n_years, + "mean_annual_obs_count": float(mean_annual_obs), + "std_annual_obs_count": float(std_annual_obs), + "mean_annual_model_rate": mean_annual_model_rate, + } + + # Add ECDF-specific fields if using ECDF method + if test_method == "ecdf": + test_result.update({ + "model_percentile": float(model_percentile), + "lower_percentile": float(lower_percentile), + "upper_percentile": float(upper_percentile), + }) + + return test_result diff --git a/openquake/hme/model_test_frameworks/gem/gem_tests.py b/openquake/hme/model_test_frameworks/gem/gem_tests.py index 01c1895..7a165e0 100644 --- a/openquake/hme/model_test_frameworks/gem/gem_tests.py +++ b/openquake/hme/model_test_frameworks/gem/gem_tests.py @@ -22,6 +22,7 @@ moment_over_under_eval_fn, rupture_matching_eval_fn, catalog_ground_motion_eval_fn, + N_test_annual, ) from ..relm.relm_tests import ( @@ -208,16 +209,51 @@ def L_test( def N_test(cfg: dict, input_data: dict) -> dict: logging.info("Running N-Test") - + test_config = cfg["config"]["model_framework"]["gem"]["N_test"] completeness_table = cfg["input"]["seis_catalog"].get("completeness_table") test_config["mag_bins"] = get_mag_bins_from_cfg(cfg) prospective = test_config.get("prospective", False) + prob_model = test_config["prob_model"] + + # Handle annual prob_model separately (GEM-specific implementation) + if prob_model == "annual": + if prospective: + eq_gdf = input_data["pro_gdf"] + else: + eq_gdf = input_data["eq_gdf"] + + if completeness_table is not None: + logging.warning( + "N-Test with prob_model='annual' does not support completeness tables. " + "Results may not be accurate." + ) + + # Get mean annual rate from model + mag_bins = get_mag_bins_from_cfg(cfg) + model_mfd = get_model_mfd(input_data["rupture_gdf"], mag_bins, t_yrs=1.0) + mean_annual_rate = sum(model_mfd.values()) + + conf_interval = test_config.get("conf_interval", 0.95) + test_method = test_config.get("test_method", "ecdf") + test_results = N_test_annual(eq_gdf, mean_annual_rate, conf_interval, test_method) + + # Add metadata for plotting/reporting + test_results["M_min"] = min(mag_bins.keys()) + test_results["prob_model"] = "annual" + + logging.info( + "N-Test number obs eqs: {}".format(test_results["n_obs_earthquakes"]) + ) + logging.info( + "N-Test number pred eqs: {}".format(test_results["n_pred_earthquakes"]) + ) + logging.info("N-Test {}".format(test_results["test_pass"])) + return test_results - if ( - test_config["prob_model"] in ["poisson", "poisson_cum"] - ) and not prospective: + # For other prob_models, use RELM implementation + if prob_model in ["poisson", "poisson_cum"] and not prospective: if completeness_table is not None: test_config["completeness_table"] = completeness_table test_config["mag_bins"] = get_mag_bins_from_cfg(cfg) diff --git a/openquake/hme/model_test_frameworks/relm/relm_test_functions.py b/openquake/hme/model_test_frameworks/relm/relm_test_functions.py index c0f397d..2db09c1 100644 --- a/openquake/hme/model_test_frameworks/relm/relm_test_functions.py +++ b/openquake/hme/model_test_frameworks/relm/relm_test_functions.py @@ -695,6 +695,12 @@ def n_test_function(rup_gdf, eq_gdf, test_config: dict): ] test_result = N_test_empirical(n_obs, n_eq_samples, conf_interval) + elif test_config["prob_model"] == "annual": + raise ValueError( + "prob_model='annual' is not supported in the RELM framework. " + "Use the GEM framework instead (model_framework: gem: N_test:)." + ) + elif test_config["prob_model"] == "neg_binom": raise NotImplementedError("can't subdivide earthquakes yet") n_eqs_in_subs = subdivide_observed_eqs( diff --git a/openquake/hme/utils/plots.py b/openquake/hme/utils/plots.py index acde9f3..00ce3f2 100644 --- a/openquake/hme/utils/plots.py +++ b/openquake/hme/utils/plots.py @@ -62,6 +62,120 @@ def _make_stoch_mfds(mfd, iters: int, t_yrs: float = 1.0): return stoch_mfd_vals +def plot_N_test_annual(N_test_results: dict): + """ + Plot histogram of annual earthquake counts with model distribution. + + Shows either Poisson (if no overdispersion) or Negative Binomial + (if overdispersion detected) distribution for comparison. + + Parameters + ---------- + N_test_results : dict + N-test results with prob_model='annual' containing annual_counts and + mean_annual_model_rate + + Returns + ------- + fig + Matplotlib figure + """ + annual_counts = np.array(N_test_results["annual_counts"]) + mean_annual_model_rate = N_test_results["mean_annual_model_rate"] + is_overdispersed = N_test_results.get("is_overdispersed", False) + r_dispersion = N_test_results.get("r_dispersion") + conf_interval = N_test_results.get("conf_interval", (None, None)) + + fig, ax = plt.subplots(figsize=(10, 6)) + + # Create histogram of observed annual counts + max_count = int(np.ceil(max(annual_counts.max(), mean_annual_model_rate * 2))) + bins = np.arange(-0.5, max_count + 1.5, 1) + + ax.hist( + annual_counts, + bins=bins, + density=True, + histtype="stepfilled", + alpha=0.6, + color="C0", + label=f"Observed annual counts (n={len(annual_counts)} years)", + edgecolor="black", + linewidth=0.5, + ) + + # Plot model distribution - use negative binomial if overdispersed, otherwise Poisson + x_vals = np.arange(0, max_count + 1) + + if is_overdispersed and r_dispersion is not None: + # Negative binomial distribution + # Convert (mu, r) to scipy's (n, p) parameterization + # n = r, p = r / (r + mu) + from scipy.stats import nbinom + n_param = r_dispersion + p_param = r_dispersion / (r_dispersion + mean_annual_model_rate) + model_probs = nbinom.pmf(x_vals, n_param, p_param) + + ax.plot( + x_vals, + model_probs, + "o-", + color="C1", + label=f"Negative Binomial (μ={mean_annual_model_rate:.2f}, r={r_dispersion:.2f})", + linewidth=2, + markersize=6, + ) + else: + # Poisson distribution + model_probs = poisson.pmf(x_vals, mean_annual_model_rate) + + ax.plot( + x_vals, + model_probs, + "o-", + color="C1", + label=f"Poisson (λ={mean_annual_model_rate:.2f})", + linewidth=2, + markersize=6, + ) + + # Add mean lines + ax.axvline( + annual_counts.mean(), + color="C0", + linestyle="--", + linewidth=2, + label=f"Mean observed ({annual_counts.mean():.2f})", + ) + ax.axvline( + mean_annual_model_rate, + color="C1", + linestyle="--", + linewidth=2, + label=f"Mean model ({mean_annual_model_rate:.2f})", + ) + + # Add confidence interval shading if available + if conf_interval[0] is not None and conf_interval[1] is not None: + ax.axvspan( + conf_interval[0], + conf_interval[1], + alpha=0.2, + color="C0", + label=f"Obs. CI ({N_test_results.get('conf_interval_frac', 0.95)*100:.0f}%)", + ) + + ax.set_xlabel("Number of Earthquakes per Year", fontsize=12) + ax.set_ylabel("Probability Density", fontsize=12) + ax.set_title("Annual Earthquake Count Distribution", fontsize=14) + ax.legend(loc="best") + ax.grid(True, alpha=0.3) + + plt.tight_layout() + + return fig + + def plot_N_test_results( N_test_results: dict, return_fig: bool = False, @@ -75,6 +189,9 @@ def plot_N_test_results( N_o=N_test_results["n_obs_earthquakes"], conf_interval=N_test_results["conf_interval"], ) + elif N_test_results["prob_model"] == "annual": + # Use the annual plotting function + fig = plot_N_test_annual(N_test_results) elif N_test_results.get("pred_samples"): fig = plot_N_test_empirical( N_test_results["pred_samples"],