Skip to content

Commit 3b3171c

Browse files
author
miranov25
committed
feat(groupby_regression): add make_parallel_fit_fast (Phase 3 NumPy baseline)
- Introduces single-process NumPy implementation of per-group OLS - Keeps full API compatibility with make_parallel_fit_v2 - Prepares for upcoming Numba acceleration (Phase 3-B) - Adds test_fast_backend_consistency() verifying numerical equality vs v2 (loky)
1 parent de11f93 commit 3b3171c

File tree

3 files changed

+1776
-0
lines changed

3 files changed

+1776
-0
lines changed
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
#!/usr/bin/env python3
2+
# bench_groupby_regression_optimized.py
3+
# Unified Phase-2 / Phase-3 benchmarking suite
4+
# ----------------------------------------------------------------------
5+
# - Phase 2: legacy demo compatibility
6+
# - Phase 3: warm + repeated timings for loky / threading / fast
7+
# ----------------------------------------------------------------------
8+
9+
from __future__ import annotations
10+
11+
import argparse
12+
import os
13+
import time
14+
from typing import Callable, Dict, List, Tuple
15+
16+
import numpy as np
17+
import pandas as pd
18+
19+
20+
# ======================================================================
21+
# Utilities
22+
# ======================================================================
23+
24+
def set_blas_threads_one_v2() -> None:
25+
"""Ensure BLAS libraries run single-threaded to avoid oversubscription."""
26+
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
27+
os.environ.setdefault("MKL_NUM_THREADS", "1")
28+
os.environ.setdefault("OMP_NUM_THREADS", "1")
29+
30+
31+
def time_call_warm_v2(fn: Callable[[], object], *, warmups: int = 1, repeats: int = 5) -> Tuple[float, List[float]]:
32+
"""Run fn() with warm-up and return (median_time_s, list_of_times)."""
33+
for _ in range(max(0, warmups)):
34+
fn()
35+
times: List[float] = []
36+
for _ in range(max(1, repeats)):
37+
t0 = time.perf_counter()
38+
fn()
39+
times.append(time.perf_counter() - t0)
40+
return float(np.median(times)), times
41+
42+
43+
def _mk_synth_data_v2(n_groups: int, rows: int, *, seed: int = 123) -> pd.DataFrame:
44+
"""Generate synthetic small-group dataset for benchmarking."""
45+
rng = np.random.default_rng(seed)
46+
N = n_groups * rows
47+
df = pd.DataFrame({
48+
"group": np.repeat(np.arange(n_groups), rows),
49+
"x1": rng.normal(size=N),
50+
"x2": rng.normal(size=N),
51+
})
52+
df["y"] = 2.0 * df["x1"] + 3.0 * df["x2"] + rng.normal(scale=0.1, size=N)
53+
df["weight"] = 1.0
54+
return df
55+
56+
57+
# ======================================================================
58+
# Phase 3 benchmark core
59+
# ======================================================================
60+
61+
def benchmark_fast_backend_v2(
62+
*,
63+
n_groups: int = 1000,
64+
rows: int = 5,
65+
n_jobs: int = 4,
66+
warmups: int = 1,
67+
repeats: int = 5,
68+
seed: int = 123,
69+
verbose: bool = True,
70+
) -> Dict[str, float]:
71+
"""
72+
Compare make_parallel_fit_v2 (loky/threading) vs make_parallel_fit_fast
73+
using warm-ups + median repeats. Returns {backend: median_seconds}.
74+
"""
75+
from groupby_regression_optimized import make_parallel_fit_v2, make_parallel_fit_fast
76+
77+
set_blas_threads_one_v2()
78+
df = _mk_synth_data_v2(n_groups=n_groups, rows=rows, seed=seed)
79+
selection = pd.Series(True, index=df.index)
80+
81+
def cfg_loky():
82+
return make_parallel_fit_v2(
83+
df=df,
84+
gb_columns=["group"],
85+
fit_columns=["y"],
86+
linear_columns=["x1", "x2"],
87+
median_columns=[],
88+
weights="weight",
89+
suffix="_loky",
90+
selection=selection,
91+
addPrediction=False,
92+
n_jobs=n_jobs,
93+
min_stat=[2],
94+
backend="loky",
95+
)
96+
97+
def cfg_threading():
98+
return make_parallel_fit_v2(
99+
df=df,
100+
gb_columns=["group"],
101+
fit_columns=["y"],
102+
linear_columns=["x1", "x2"],
103+
median_columns=[],
104+
weights="weight",
105+
suffix="_thr",
106+
selection=selection,
107+
addPrediction=False,
108+
n_jobs=n_jobs,
109+
min_stat=[2],
110+
backend="threading",
111+
)
112+
113+
def cfg_fast():
114+
return make_parallel_fit_fast(
115+
df=df,
116+
gb_columns=["group"],
117+
fit_columns=["y"],
118+
linear_columns=["x1", "x2"],
119+
median_columns=[],
120+
weights="weight",
121+
suffix="_fast",
122+
selection=selection,
123+
cast_dtype="float64",
124+
min_stat=[2],
125+
diag=False,
126+
diag_prefix="diag_",
127+
addPrediction=False,
128+
)
129+
130+
backends = [("loky", cfg_loky), ("threading", cfg_threading), ("fast", cfg_fast)]
131+
132+
if verbose:
133+
print("\n" + "=" * 70)
134+
print("PHASE 3: Fast backend benchmark (warm-up + median)")
135+
print("=" * 70)
136+
print(f"Data: {n_groups} groups × {rows} rows = {n_groups*rows} total | n_jobs={n_jobs}")
137+
print(f"Warm-ups: {warmups} Repeats: {repeats}\n")
138+
139+
results: Dict[str, float] = {}
140+
for name, fn in backends:
141+
t_med, runs = time_call_warm_v2(fn, warmups=warmups, repeats=repeats)
142+
results[name] = t_med
143+
if verbose:
144+
print(f"{name:10s}: {t_med:.3f}s (runs: {', '.join(f'{x:.3f}' for x in runs)})")
145+
146+
base = results.get("loky", np.nan)
147+
if verbose and np.isfinite(base):
148+
print("\nSpeedups (relative to loky):")
149+
for name, t in results.items():
150+
sp = base / t if t > 0 else np.nan
151+
print(f"{name:10s}: {sp:5.2f}×")
152+
print()
153+
154+
return results
155+
156+
157+
def run_phase3_benchmarks_v2(
158+
*,
159+
n_groups: int = 1000,
160+
rows: int = 5,
161+
n_jobs: int = 4,
162+
warmups: int = 1,
163+
repeats: int = 5,
164+
seed: int = 123,
165+
csv_path: str | None = None,
166+
verbose: bool = True,
167+
) -> Dict[str, float]:
168+
"""Convenience wrapper; optionally log results to CSV."""
169+
results = benchmark_fast_backend_v2(
170+
n_groups=n_groups,
171+
rows=rows,
172+
n_jobs=n_jobs,
173+
warmups=warmups,
174+
repeats=repeats,
175+
seed=seed,
176+
verbose=verbose,
177+
)
178+
if csv_path:
179+
write_results_csv_v2(
180+
results,
181+
csv_path=csv_path,
182+
extra_meta=dict(
183+
n_groups=n_groups,
184+
rows=rows,
185+
n_jobs=n_jobs,
186+
warmups=warmups,
187+
repeats=repeats,
188+
seed=seed,
189+
),
190+
)
191+
return results
192+
193+
194+
# ======================================================================
195+
# Phase 2 compatibility shim
196+
# ======================================================================
197+
198+
def run_phase2_suite_v2() -> None:
199+
"""
200+
Try to run your existing Phase-2 demo/benchmark suite.
201+
Attempts to find it in this file or import from phase2_demo.py.
202+
"""
203+
candidates = [
204+
"run_phase2_suite",
205+
"phase2_main",
206+
"run_phase2",
207+
"demo_phase2",
208+
"main_phase2",
209+
"run_phase2_benchmarks",
210+
"run_phase2_demo",
211+
]
212+
for name in candidates:
213+
fn = globals().get(name)
214+
if callable(fn):
215+
print(f"[Phase-2] Running entry point: {name}()")
216+
return fn()
217+
218+
try:
219+
import phase2_demo as _p2
220+
for name in candidates:
221+
fn = getattr(_p2, name, None)
222+
if callable(fn):
223+
print(f"[Phase-2] Running entry point: phase2_demo.{name}()")
224+
return fn()
225+
print("[Phase-2] Found phase2_demo module, but no known entry point found.")
226+
except Exception:
227+
pass
228+
229+
print("[Phase-2] No entry point found. "
230+
"Paste your Phase-2 runner into this file "
231+
"and name it one of: " + ", ".join(candidates))
232+
233+
234+
# ======================================================================
235+
# CSV writer for result tracking
236+
# ======================================================================
237+
238+
def write_results_csv_v2(
239+
results: Dict[str, float],
240+
*,
241+
csv_path: str,
242+
extra_meta: Dict[str, object] | None = None,
243+
) -> None:
244+
"""Append benchmark results with metadata to a CSV file."""
245+
row = {"timestamp": pd.Timestamp.now(tz="UTC").isoformat()}
246+
row.update({f"time_{k}_s": float(v) for k, v in results.items()})
247+
if extra_meta:
248+
row.update(extra_meta)
249+
df = pd.DataFrame([row])
250+
header = not os.path.exists(csv_path)
251+
df.to_csv(csv_path, mode="a", index=False, header=header)
252+
print(f"[log] Results appended to {csv_path}")
253+
254+
255+
# ======================================================================
256+
# CLI entry point (no symmetry break)
257+
# ======================================================================
258+
259+
def main_v2(argv: List[str] | None = None) -> None:
260+
"""Command-line interface for benchmarks."""
261+
p = argparse.ArgumentParser(description="Benchmarks for GroupByRegressor (v2/v3)")
262+
p.add_argument("--phase2", action="store_true", help="Run Phase-2 legacy suite")
263+
p.add_argument("--phase3", action="store_true", help="Run Phase-3 fast benchmark")
264+
p.add_argument("--n-groups", type=int, default=1000)
265+
p.add_argument("--rows", type=int, default=5)
266+
p.add_argument("--n-jobs", type=int, default=4)
267+
p.add_argument("--warmups", type=int, default=1)
268+
p.add_argument("--repeats", type=int, default=5)
269+
p.add_argument("--csv", type=str, help="Optional path to append CSV results")
270+
args = p.parse_args(argv)
271+
272+
if args.phase2:
273+
run_phase2_suite_v2()
274+
else:
275+
run_phase3_benchmarks_v2(
276+
n_groups=args.n_groups,
277+
rows=args.rows,
278+
n_jobs=args.n_jobs,
279+
warmups=args.warmups,
280+
repeats=args.repeats,
281+
csv_path=args.csv,
282+
)
283+
284+
285+
if __name__ == "__main__":
286+
main_v2()

0 commit comments

Comments
 (0)