Skip to content

Commit 4d90f73

Browse files
author
miranov25
committed
refactor: update imports for package structure
Changes: - Enable package-level exports in __init__.py (uncommented imports) - Update test imports to use relative imports (..) - Update benchmark imports to use relative imports (..) - Add placeholder for future aliases in groupby_regression.py (commented) - Update all internal imports in test_groupby_regression_optimized.py All imports now work with new package structure. Tests verified: 38/38 passing - test_groupby_regression.py: 14/14 - test_groupby_regression_optimized.py: 24/24
1 parent e43f332 commit 4d90f73

File tree

9 files changed

+2388
-25
lines changed

9 files changed

+2388
-25
lines changed

UTILS/dfextensions/groupby_regression/__init__.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,23 @@
1818
"""
1919

2020
# Import main classes from modules (will add after files are moved)
21-
# from .groupby_regression import GroupByRegressor
22-
# from .groupby_regression_optimized import (
23-
# make_parallel_fit_v2,
24-
# make_parallel_fit_v3,
25-
# make_parallel_fit_v4,
26-
# GroupByRegressorOptimized,
27-
# )
21+
from .groupby_regression import GroupByRegressor
22+
from .groupby_regression_optimized import (
23+
make_parallel_fit_v2,
24+
make_parallel_fit_v3,
25+
make_parallel_fit_v4,
26+
GroupByRegressorOptimized,
27+
)
2828

2929
# Version info
3030
__version__ = '2.0.0'
3131
__author__ = 'Marian Ivanov'
3232

3333
# Expose at package level (will uncomment after files are moved)
34-
# __all__ = [
35-
# 'GroupByRegressor',
36-
# 'make_parallel_fit_v2',
37-
# 'make_parallel_fit_v3',
38-
# 'make_parallel_fit_v4',
39-
# 'GroupByRegressorOptimized',
40-
# ]
34+
__all__ = [
35+
'GroupByRegressor',
36+
'make_parallel_fit_v2',
37+
'make_parallel_fit_v3',
38+
'make_parallel_fit_v4',
39+
'GroupByRegressorOptimized',
40+
]

UTILS/dfextensions/groupby_regression/benchmarks/bench_groupby_regression.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535

3636
# --- Import the project module ---
3737
try:
38-
import groupby_regression as gr
39-
from groupby_regression import GroupByRegressor
38+
from .. import groupby_regression as gr
39+
from ..groupby_regression import GroupByRegressor
4040
except Exception as e:
4141
print("[ERROR] Failed to import groupby_regression.py:", e, file=sys.stderr)
4242
raise
Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
#!/usr/bin/env python3
2+
"""
3+
bench_groupby_regression.py — Single-file benchmark suite and reporter
4+
5+
Scenarios covered (configurable via CLI):
6+
1) Clean baseline (serial & parallel)
7+
2) Outliers: 5% @ 3σ, 10% @ 5σ, 10% @ 10σ
8+
3) Group sizes: 5, 20, 100 rows/group
9+
4) n_jobs: 1, 4, 10
10+
5) fitters: ols, robust, huber (if supported by implementation)
11+
6) sigmaCut: 3, 5, 10, 100
12+
13+
Outputs:
14+
- Pretty text report
15+
- JSON results (per scenario, with timing and configuration)
16+
- Optional CSV summary
17+
18+
Usage examples:
19+
python3 bench_groupby_regression.py --quick
20+
python3 bench_groupby_regression.py --rows 50000 --groups 10000 --out out_dir
21+
python3 bench_groupby_regression.py --emit-csv
22+
23+
Note:
24+
This script expects 'groupby_regression.py' in PYTHONPATH or next to it and
25+
uses GroupByRegressor.make_parallel_fit(...). See the wiring in _run_one().
26+
"""
27+
from __future__ import annotations
28+
import argparse, json, math, os, sys, time
29+
from dataclasses import dataclass, asdict
30+
from pathlib import Path
31+
from typing import List, Dict, Any, Tuple
32+
33+
import numpy as np
34+
import pandas as pd
35+
36+
# --- Import the project module ---
37+
try:
38+
from .. import groupby_regression as gr
39+
from groupby_regression import GroupByRegressor
40+
except Exception as e:
41+
print("[ERROR] Failed to import groupby_regression.py:", e, file=sys.stderr)
42+
raise
43+
44+
# --- Data Generators (Phase 1) ---
45+
def _make_groups(n_rows: int, n_groups: int, rng: np.random.Generator) -> np.ndarray:
46+
base = np.repeat(np.arange(n_groups, dtype=np.int32), n_rows // n_groups)
47+
rem = n_rows - base.size
48+
if rem > 0:
49+
extra = rng.choice(n_groups, size=rem, replace=False)
50+
base = np.concatenate([base, extra.astype(np.int32, copy=False)])
51+
rng.shuffle(base)
52+
return base
53+
54+
def _find_diag_col(df: pd.DataFrame, base: str, dp: str, suffix: str | None = None) -> str | None:
55+
"""
56+
Return diagnostics column for a given base (e.g. 'time_ms'), handling suffixes.
57+
If suffix is provided, match startswith(dp+base) and endswith(suffix).
58+
"""
59+
exact = dp + base
60+
if suffix is None and exact in df.columns:
61+
return exact
62+
pref = dp + base
63+
for c in df.columns:
64+
if not isinstance(c, str):
65+
continue
66+
if not c.startswith(pref):
67+
continue
68+
if suffix is not None and not c.endswith(suffix):
69+
continue
70+
return c
71+
return None
72+
73+
74+
def create_clean_data(n_rows: int, n_groups: int, *, seed: int = 42, noise_sigma: float = 1.0, x_corr: float = 0.0) -> pd.DataFrame:
75+
rng = np.random.default_rng(seed)
76+
group = _make_groups(n_rows, n_groups, rng)
77+
mean = np.array([0.0, 0.0])
78+
cov = np.array([[1.0, x_corr], [x_corr, 1.0]])
79+
x = rng.multivariate_normal(mean, cov, size=n_rows, method="cholesky")
80+
x1 = x[:, 0].astype(np.float32); x2 = x[:, 1].astype(np.float32)
81+
eps = rng.normal(0.0, noise_sigma, size=n_rows).astype(np.float32)
82+
y = (2.0 * x1 + 3.0 * x2 + eps).astype(np.float32)
83+
df = pd.DataFrame({"group": group, "x1": x1, "x2": x2, "y": y})
84+
return df
85+
86+
def create_data_with_outliers(n_rows: int, n_groups: int, *, outlier_pct: float = 0.10, outlier_magnitude: float = 5.0,
87+
seed: int = 42, noise_sigma: float = 1.0, x_corr: float = 0.0) -> pd.DataFrame:
88+
df = create_clean_data(n_rows, n_groups, seed=seed, noise_sigma=noise_sigma, x_corr=x_corr)
89+
rng = np.random.default_rng(seed + 1337)
90+
k = int(math.floor(outlier_pct * n_rows))
91+
if k > 0:
92+
idx = rng.choice(n_rows, size=k, replace=False)
93+
signs = rng.choice(np.array([-1.0, 1.0], dtype=np.float32), size=k, replace=True)
94+
shift = (outlier_magnitude * noise_sigma * signs).astype(np.float32)
95+
y = df["y"].to_numpy(copy=True)
96+
y[idx] = (y[idx] + shift).astype(np.float32)
97+
df["y"] = y
98+
return df
99+
100+
# --- Benchmark Plumbing ---
101+
@dataclass
102+
class Scenario:
103+
name: str
104+
outlier_pct: float
105+
outlier_mag: float
106+
rows_per_group: int
107+
n_groups: int
108+
n_jobs: int
109+
fitter: str
110+
sigmaCut: float
111+
112+
def _run_one(df: pd.DataFrame, scenario: Scenario, args) -> Dict[str, Any]:
113+
df = df.copy()
114+
df["group2"] = df["group"].astype(np.int32)
115+
df["weight"] = 1.0
116+
selection = pd.Series(True, index=df.index)
117+
118+
t0 = time.perf_counter()
119+
_, df_params = GroupByRegressor.make_parallel_fit(
120+
df,
121+
gb_columns=["group", "group2"],
122+
fit_columns=["y"],
123+
linear_columns=["x1", "x2"],
124+
median_columns=[],
125+
weights="weight",
126+
suffix="_fit",
127+
selection=selection,
128+
addPrediction=False,
129+
n_jobs=scenario.n_jobs,
130+
min_stat=[3, 4],
131+
sigmaCut=scenario.sigmaCut,
132+
fitter=scenario.fitter,
133+
batch_size="auto",
134+
diag=getattr(args, "diag", False),
135+
diag_prefix=getattr(args, "diag_prefix", "diag_"),
136+
)
137+
dt = time.perf_counter() - t0
138+
n_groups_eff = int(df_params.shape[0])
139+
per_1k = dt / (n_groups_eff / 1000.0) if n_groups_eff else float("nan")
140+
141+
return {
142+
"scenario": scenario.name,
143+
"config": {
144+
"n_jobs": scenario.n_jobs,
145+
"sigmaCut": scenario.sigmaCut,
146+
"fitter": scenario.fitter,
147+
"rows_per_group": scenario.rows_per_group,
148+
"n_groups": scenario.n_groups,
149+
"outlier_pct": scenario.outlier_pct,
150+
"outlier_mag": scenario.outlier_mag,
151+
},
152+
"result": {
153+
"total_sec": dt,
154+
"sec_per_1k_groups": per_1k,
155+
"n_groups_effective": n_groups_eff,
156+
},
157+
"df_params": df_params if getattr(args, "diag", False) else None, # <-- add this
158+
}
159+
160+
def _make_df(s: Scenario, seed: int = 7) -> pd.DataFrame:
161+
n_rows = s.rows_per_group * s.n_groups
162+
if s.outlier_pct > 0.0:
163+
return create_data_with_outliers(n_rows, s.n_groups, outlier_pct=s.outlier_pct, outlier_magnitude=s.outlier_mag, seed=seed)
164+
else:
165+
return create_clean_data(n_rows, s.n_groups, seed=seed)
166+
167+
def _format_report(rows: List[Dict[str, Any]]) -> str:
168+
lines = []
169+
lines.append("=" * 64); lines.append("BENCHMARK: GroupBy Regression"); lines.append("=" * 64)
170+
for r in rows:
171+
cfg = r["config"]; res = r["result"]
172+
lines.append("")
173+
lines.append(f"Scenario: {r['scenario']}")
174+
lines.append(f" Config: n_jobs={cfg['n_jobs']}, sigmaCut={cfg['sigmaCut']}, fitter={cfg['fitter']}")
175+
lines.append(f" Data: {cfg['rows_per_group']*cfg['n_groups']:,} rows, {res['n_groups_effective']:,} groups (target {cfg['n_groups']:,}), ~{cfg['rows_per_group']} rows/group")
176+
if cfg['outlier_pct']>0:
177+
lines.append(f" Outliers: {cfg['outlier_pct']*100:.0f}% at {cfg['outlier_mag']}σ")
178+
lines.append(f" Result: {res['total_sec']:.2f}s ({res['sec_per_1k_groups']:.2f}s per 1k groups)")
179+
lines.append("")
180+
return "\n".join(lines)
181+
182+
def run_suite(args) -> Tuple[List[Dict[str, Any]], str, str, str | None]:
183+
# Build scenarios
184+
scenarios: List[Scenario] = []
185+
186+
# Baselines
187+
scenarios.append(Scenario("Clean Data, Serial", 0.0, 0.0, args.rows_per_group, args.groups, 1, args.fitter, args.sigmaCut))
188+
if not args.serial_only:
189+
scenarios.append(Scenario("Clean Data, Parallel", 0.0, 0.0, args.rows_per_group, args.groups, args.n_jobs, args.fitter, args.sigmaCut))
190+
191+
# Outlier sets
192+
scenarios.append(Scenario("5% Outliers (3σ), Serial", 0.05, 3.0, args.rows_per_group, args.groups, 1, args.fitter, args.sigmaCut))
193+
scenarios.append(Scenario("10% Outliers (5σ), Serial", 0.10, 5.0, args.rows_per_group, args.groups, 1, args.fitter, args.sigmaCut))
194+
# High-outlier stress test
195+
scenarios.append(
196+
Scenario(
197+
"30% Outliers (5σ), Serial",
198+
0.30, 5.0,
199+
args.rows_per_group,
200+
args.groups,
201+
1,
202+
args.fitter,
203+
args.sigmaCut,
204+
)
205+
)
206+
if not args.serial_only:
207+
scenarios.append(
208+
Scenario(
209+
"30% Outliers (5σ), Parallel",
210+
0.30, 5.0,
211+
args.rows_per_group,
212+
args.groups,
213+
args.n_jobs,
214+
args.fitter,
215+
args.sigmaCut,
216+
)
217+
)
218+
219+
if not args.serial_only:
220+
scenarios.append(Scenario("10% Outliers (5σ), Parallel", 0.10, 5.0, args.rows_per_group, args.groups, args.n_jobs, args.fitter, args.sigmaCut))
221+
scenarios.append(Scenario("10% Outliers (10σ), Serial", 0.10, 10.0, args.rows_per_group, args.groups, 1, args.fitter, args.sigmaCut))
222+
223+
# Prepare output
224+
out_dir = Path(args.out).resolve()
225+
out_dir.mkdir(parents=True, exist_ok=True)
226+
diag_rows=[]
227+
human_summaries: List[Tuple[str, str]] = []
228+
# Run
229+
results: List[Dict[str, Any]] = []
230+
for s in scenarios:
231+
df = _make_df(s, seed=args.seed)
232+
# PASS ARGS HERE
233+
out = _run_one(df, s, args)
234+
results.append(out)
235+
if args.diag and out.get("df_params") is not None:
236+
dfp = out["df_params"]
237+
dp = args.diag_prefix
238+
# Try to infer a suffix from any diag column (optional). If you know your suffix, set it via CLI later.
239+
# For now we won’t guess; we’ll just use dp and allow both suffixed or unsuffixed.
240+
241+
# 2a) Write top-10 violators per scenario
242+
safe = (s.name.replace(" ", "_")
243+
.replace("%","pct")
244+
.replace("(","").replace(")","")
245+
.replace("σ","sigma"))
246+
tcol = _find_diag_col(dfp, "time_ms", dp)
247+
if tcol:
248+
dfp.sort_values(tcol, ascending=False).head(10).to_csv(
249+
out_dir / f"diag_top10_time__{safe}.csv", index=False
250+
)
251+
rcol = _find_diag_col(dfp, "n_refits", dp)
252+
if rcol:
253+
dfp.sort_values(rcol, ascending=False).head(10).to_csv(
254+
out_dir / f"diag_top10_refits__{safe}.csv", index=False
255+
)
256+
257+
# 2b) Class-level summary (machine + human)
258+
summary = GroupByRegressor.summarize_diagnostics(dfp, diag_prefix=dp,diag_suffix="_fit")
259+
summary_row = {"scenario": s.name, **summary}
260+
diag_rows.append(summary_row)
261+
human = GroupByRegressor.format_diagnostics_summary(summary)
262+
human_summaries.append((s.name, human))
263+
if args.diag:
264+
txt_path = out_dir / "benchmark_report.txt"
265+
with open(txt_path, "a") as f:
266+
f.write("\nDiagnostics summary (per scenario):\n")
267+
for name, human in human_summaries:
268+
f.write(f" - {name}: {human}\n")
269+
f.write("\nTop-10 violators were saved per scenario as:\n")
270+
f.write(" diag_top10_time__<scenario>.csv, diag_top10_refits__<scenario>.csv\n")
271+
272+
273+
# Save
274+
txt_path = out_dir / "benchmark_report.txt"
275+
json_path = out_dir / "benchmark_results.json"
276+
with open(txt_path, "w") as f:
277+
f.write(_format_report(results))
278+
results_slim = [{k: v for k, v in r.items() if k != "df_params"} for r in results]
279+
with open(json_path, "w") as f:
280+
json.dump(results_slim, f, indent=2)
281+
282+
csv_path = None
283+
if args.emit_csv:
284+
import csv
285+
csv_path = out_dir / "benchmark_results.csv"
286+
with open(csv_path, "w", newline="") as f:
287+
w = csv.writer(f)
288+
w.writerow(["scenario","n_jobs","sigmaCut","fitter","rows_per_group","n_groups","outlier_pct","outlier_mag","total_sec","sec_per_1k_groups","n_groups_effective"])
289+
for r in results:
290+
cfg = r["config"]; res = r["result"]
291+
w.writerow([r["scenario"], cfg["n_jobs"], cfg["sigmaCut"], cfg["fitter"], cfg["rows_per_group"], cfg["n_groups"], cfg["outlier_pct"], cfg["outlier_mag"], res["total_sec"], res["sec_per_1k_groups"], res["n_groups_effective"]])
292+
293+
# --- Append diagnostics summaries to the text report ---
294+
if args.diag and 'human_summaries' in locals() and human_summaries:
295+
with open(txt_path, "a") as f:
296+
f.write("\nDiagnostics summary (per scenario):\n")
297+
for name, human in human_summaries:
298+
f.write(f" - {name}: {human}\n")
299+
f.write("\nTop-10 violators saved as diag_top10_time__<scenario>.csv "
300+
"and diag_top10_refits__<scenario>.csv\n")
301+
302+
return results, str(txt_path), str(json_path), (str(csv_path) if csv_path else None)
303+
304+
def parse_args():
305+
p = argparse.ArgumentParser(description="GroupBy Regression Benchmark Suite")
306+
p.add_argument("--rows-per-group", type=int, default=5, help="Rows per group.")
307+
p.add_argument("--groups", type=int, default=5000, help="Number of groups.")
308+
p.add_argument("--n-jobs", type=int, default=4, help="Workers for parallel scenarios.")
309+
p.add_argument("--sigmaCut", type=float, default=5.0, help="Sigma cut for robust fitting.")
310+
p.add_argument("--fitter", type=str, default="ols", help="Fitter: ols|robust|huber depending on implementation.")
311+
p.add_argument("--seed", type=int, default=7, help="Random seed.")
312+
p.add_argument("--out", type=str, default="bench_out", help="Output directory.")
313+
p.add_argument("--emit-csv", action="store_true", help="Also emit CSV summary.")
314+
p.add_argument("--serial-only", action="store_true", help="Skip parallel scenarios.")
315+
p.add_argument("--quick", action="store_true", help="Small quick run: groups=200.")
316+
p.add_argument("--diag", action="store_true",
317+
help="Collect per-group diagnostics into dfGB (diag_* columns).")
318+
p.add_argument("--diag-prefix", type=str, default="diag_",
319+
help="Prefix for diagnostic columns (default: diag_).")
320+
321+
args = p.parse_args()
322+
if args.quick:
323+
args.groups = min(args.groups, 200)
324+
return args
325+
326+
327+
328+
def main():
329+
args = parse_args()
330+
results, txt_path, json_path, csv_path = run_suite(args)
331+
print(_format_report(results))
332+
print("\nSaved outputs:")
333+
print(" -", txt_path)
334+
print(" -", json_path)
335+
if csv_path: print(" -", csv_path)
336+
337+
if __name__ == "__main__":
338+
main()

0 commit comments

Comments
 (0)