Skip to content

Commit 4b7fa67

Browse files
author
miranov25
committed
test(v2/v3/v4): add verbose diagnostics for multi-target layout
- Keep existing test setup; no external fixtures introduced - Print expected groups and row counts for fast CI triage - Assert one-row-per-group, no duplicate keys, matching group-key sets - Check presence of per-target intercept and slope columns (with suffix)
1 parent 0c4961c commit 4b7fa67

File tree

2 files changed

+264
-178
lines changed

2 files changed

+264
-178
lines changed

UTILS/dfextensions/optimized/groupby_regression_optimized.py

Lines changed: 156 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -183,188 +183,167 @@ class GroupByRegressorOptimized:
183183

184184
@staticmethod
185185
def make_parallel_fit_optimized(
186-
df: pd.DataFrame,
187-
gb_columns: List[str],
188-
fit_columns: List[str],
189-
linear_columns: List[str],
190-
median_columns: List[str],
191-
weights: str,
192-
suffix: str,
193-
selection: pd.Series,
194-
addPrediction: bool = False,
195-
cast_dtype: Union[str, None] = None,
196-
n_jobs: int = 1,
197-
min_stat: Union[int, List[int]] = 10,
198-
sigmaCut: float = 5.0,
199-
fitter: Union[str, Callable] = "ols",
200-
batch_size: Union[str, int] = "auto",
201-
batch_strategy: str = "auto",
202-
max_refits: int = 10,
203-
small_group_threshold: int = 30,
204-
min_batch_size: int = 10,
205-
backend: str = 'loky',
186+
df: pd.DataFrame,
187+
gb_columns: List[str],
188+
fit_columns: List[str],
189+
linear_columns: List[str],
190+
median_columns: List[str],
191+
weights: str,
192+
suffix: str,
193+
selection: pd.Series,
194+
addPrediction: bool = False,
195+
cast_dtype: Union[str, None] = None,
196+
n_jobs: int = 1,
197+
min_stat: Union[int, List[int]] = 10,
198+
sigmaCut: float = 5.0,
199+
fitter: Union[str, Callable] = "ols",
200+
batch_size: Union[str, int] = "auto",
201+
batch_strategy: str = "auto",
202+
max_refits: int = 10,
203+
small_group_threshold: int = 30,
204+
min_batch_size: int = 10,
205+
backend: str = 'loky',
206206
) -> Tuple[pd.DataFrame, pd.DataFrame]:
207207
"""
208208
Optimized parallel fitting with array-based data passing and smart batching.
209-
210-
New parameters:
211-
batch_strategy: "auto", "no_batching", "size_bucketing"
212-
- auto: Choose based on group size distribution
213-
- no_batching: Original behavior (each group is a task)
214-
- size_bucketing: Batch small groups together
215-
small_group_threshold: Groups smaller than this are considered "small"
216-
min_batch_size: Minimum number of small groups per batch
217-
backend: "loky", "threading", "multiprocessing"
218-
- loky (default): Process-based, best for medium/large groups (>50 rows)
219-
- threading: Thread-based, best for small groups (<15 rows) if GIL-free
220-
- multiprocessing: Process-based, alternative to loky
221-
222-
Performance improvements:
223-
- Avoids DataFrame slicing in workers (60-80% overhead reduction)
224-
- Batches small groups to reduce spawn overhead (2-5× faster for small groups)
225-
- Better memory locality
226-
- Threading backend for GIL-free operations (10× faster for tiny groups)
227-
228-
Note on min_stat:
229-
The optimized version simplifies per-predictor min_stat checks for performance.
230-
If min_stat=[10, 20], the original would skip predictors individually per group.
231-
The optimized version uses min(min_stat) for the entire group.
232-
For most use cases (same min_stat for all predictors), behavior is identical.
233-
If you need strict per-predictor filtering, use the original implementation.
234209
"""
235-
if isinstance(weights, str) and weights not in df.columns:
236-
raise ValueError(f"Weight column '{weights}' not found")
237-
238-
if isinstance(min_stat, int):
239-
min_stat = [min_stat] * len(linear_columns)
240-
241-
# Warn if user provided different min_stat per predictor
242-
if len(set(min_stat)) > 1:
243-
logging.warning(
244-
f"Optimized version uses min(min_stat)={min(min_stat)} for all predictors. "
245-
f"Per-predictor filtering (min_stat={min_stat}) is not supported. "
246-
f"Use original implementation if this is required."
247-
)
248-
249-
df_selected = df.loc[selection].copy()
250-
251-
# Pre-extract arrays (done once in parent process)
252-
X_all = df_selected[linear_columns].values.astype(np.float64)
253-
w_all = df_selected[weights].values.astype(np.float64)
254-
255-
# For targets, we'll handle them one at a time to save memory
256-
target_results = []
257-
258-
for target_idx, target_col in enumerate(fit_columns):
259-
y_all = df_selected[target_col].values.astype(np.float64)
260-
261-
# Group and filter
262-
grouped = df_selected.groupby(gb_columns)
263-
filtered_items = [
264-
(key, idxs.values)
265-
for key, idxs in grouped.groups.items()
266-
if len(idxs) >= min(min_stat)
267-
]
268-
269-
if not filtered_items:
270-
logging.warning(f"No groups passed filtering for {target_col}")
271-
continue
272-
273-
# Decide on batching strategy
274-
if batch_strategy == "auto":
275-
group_sizes = [len(idxs) for _, idxs in filtered_items]
276-
median_size = np.median(group_sizes)
277-
pct_small = np.mean([s < small_group_threshold for s in group_sizes])
278-
279-
if pct_small > 0.7 and n_jobs > 1:
280-
batch_strategy = "size_bucketing"
281-
else:
282-
batch_strategy = "no_batching"
283-
284-
logging.info(f"Auto-selected batch_strategy={batch_strategy} "
285-
f"(median_size={median_size:.1f}, pct_small={pct_small:.1%})")
286-
287-
# Process groups
288-
if batch_strategy == "size_bucketing" and n_jobs > 1:
289-
# Separate small and large groups
290-
small_groups = []
291-
large_groups = []
292-
293-
for key, idxs in filtered_items:
294-
if len(idxs) < small_group_threshold:
295-
small_groups.append((key, idxs))
296-
else:
297-
large_groups.append((key, idxs))
298-
299-
# Batch small groups
300-
small_batches = [
301-
small_groups[i:i+min_batch_size]
302-
for i in range(0, len(small_groups), min_batch_size)
303-
]
304-
305-
logging.info(f"Processing {len(large_groups)} large groups + "
306-
f"{len(small_groups)} small groups in {len(small_batches)} batches")
307-
308-
# Process large groups individually
309-
large_results = Parallel(n_jobs=n_jobs, backend=backend)(
310-
delayed(process_group_array_based)(
311-
key, idxs, X_all, y_all, w_all, gb_columns,
312-
target_idx, list(range(len(linear_columns))),
313-
min(min_stat), sigmaCut, fitter, max_refits
314-
)
315-
for key, idxs in large_groups
210+
logger = logging.getLogger(__name__)
211+
if isinstance(min_stat, list):
212+
min_stat = min(min_stat) if len(min_stat) > 0 else 1
213+
214+
# Apply selection
215+
df_selected = df[selection].copy()
216+
if df_selected.empty:
217+
return df.assign(**{f"{col}{suffix}": np.nan for col in fit_columns}), \
218+
pd.DataFrame(columns=gb_columns)
219+
220+
# Prepare arrays (array-based path)
221+
y_matrix = df_selected[fit_columns].to_numpy()
222+
X_all = df_selected[linear_columns].to_numpy()
223+
w_all = df_selected[weights].to_numpy() if isinstance(weights, str) else np.ones(len(df_selected))
224+
225+
# Group indices (array-based)
226+
grouped = df_selected.groupby(gb_columns, sort=False, observed=True)
227+
groups_items = list(grouped.groups.items())
228+
229+
# Choose batching strategy
230+
def choose_strategy():
231+
if batch_strategy in ("no_batching", "size_bucketing"):
232+
return batch_strategy
233+
# auto
234+
sizes = np.array([len(idxs) for _, idxs in groups_items])
235+
if (sizes <= small_group_threshold).mean() > 0.7 and len(groups_items) > 50:
236+
return "size_bucketing"
237+
return "no_batching"
238+
239+
strategy = choose_strategy()
240+
241+
# Pre-build y index per target
242+
target_indices = {t: i for i, t in enumerate(fit_columns)}
243+
244+
target_results: List[Tuple[str, List[dict]]] = []
245+
246+
for target_col in fit_columns:
247+
target_idx = target_indices[target_col]
248+
249+
# batching
250+
if strategy == "size_bucketing":
251+
small = [(k, idxs) for k, idxs in groups_items if len(idxs) < small_group_threshold]
252+
large = [(k, idxs) for k, idxs in groups_items if len(idxs) >= small_group_threshold]
253+
254+
# Bucket small groups
255+
small_sorted = sorted(small, key=lambda kv: len(kv[1]), reverse=True)
256+
buckets: List[List[Tuple[tuple, np.ndarray]]] = []
257+
current: List[Tuple[tuple, np.ndarray]] = []
258+
current_size = 0
259+
for k, idxs in small_sorted:
260+
current.append((k, idxs))
261+
current_size += len(idxs)
262+
if current_size >= max(min_batch_size, small_group_threshold):
263+
buckets.append(current)
264+
current = []
265+
current_size = 0
266+
if current:
267+
buckets.append(current)
268+
269+
def process_bucket(bucket):
270+
out = []
271+
for key, idxs in bucket:
272+
out.append(process_group_array_based(
273+
key, idxs, X_all, y_matrix[:, target_idx], w_all,
274+
gb_columns, target_idx, list(range(len(linear_columns))),
275+
min_stat, sigmaCut, fitter, max_refits
276+
))
277+
return out
278+
279+
results_small = Parallel(n_jobs=n_jobs, backend=backend)(
280+
delayed(process_bucket)(b) for b in buckets
316281
)
317-
318-
# Process small groups in batches
319-
small_batch_results = Parallel(n_jobs=n_jobs, backend=backend)(
320-
delayed(process_batch_of_groups)(
321-
batch, X_all, y_all, w_all, gb_columns,
322-
target_idx, list(range(len(linear_columns))),
323-
min(min_stat), sigmaCut, fitter, max_refits
282+
results_small = [r for sub in results_small for r in sub]
283+
284+
# Large groups individually
285+
results_large = Parallel(n_jobs=n_jobs, batch_size=batch_size, backend=backend)(
286+
delayed(process_group_array_based)(
287+
key, idxs, X_all, y_matrix[:, target_idx], w_all,
288+
gb_columns, target_idx, list(range(len(linear_columns))),
289+
min_stat, sigmaCut, fitter, max_refits
324290
)
325-
for batch in small_batches
291+
for key, idxs in large
326292
)
327-
328-
# Flatten batched results
329-
small_results = [r for batch in small_batch_results for r in batch]
330-
results = large_results + small_results
331-
293+
294+
results = results_small + results_large
295+
332296
else:
333297
# Original approach: each group is a task
334298
results = Parallel(n_jobs=n_jobs, batch_size=batch_size, backend=backend)(
335299
delayed(process_group_array_based)(
336-
key, idxs, X_all, y_all, w_all, gb_columns,
337-
target_idx, list(range(len(linear_columns))),
338-
min(min_stat), sigmaCut, fitter, max_refits
300+
key, idxs, X_all, y_matrix[:, target_idx], w_all,
301+
gb_columns, target_idx, list(range(len(linear_columns))),
302+
min_stat, sigmaCut, fitter, max_refits
339303
)
340-
for key, idxs in filtered_items
304+
for key, idxs in groups_items
341305
)
342-
306+
343307
target_results.append((target_col, results))
344-
345-
# Construct dfGB
346-
dfGB = pd.DataFrame([r for _, results in target_results for r in results])
347-
348-
# Expand coefficients into separate columns (only if coefficients exist)
349-
if not dfGB.empty and 'coefficients' in dfGB.columns:
350-
for target_col, results in target_results:
351-
for i, pred_col in enumerate(linear_columns):
352-
col_name = f"{target_col}_slope_{pred_col}"
353-
dfGB[col_name] = dfGB['coefficients'].apply(
354-
lambda x: x[i] if isinstance(x, np.ndarray) and len(x) > i else np.nan
355-
)
356-
357-
if 'intercept' in dfGB.columns:
358-
dfGB[f"{target_col}_intercept"] = dfGB['intercept']
359-
if 'rms' in dfGB.columns:
360-
dfGB[f"{target_col}_rms"] = dfGB['rms']
361-
if 'mad' in dfGB.columns:
362-
dfGB[f"{target_col}_mad"] = dfGB['mad']
363-
364-
# Remove temporary columns
365-
dfGB = dfGB.drop(columns=['coefficients', 'intercept', 'rms', 'mad'], errors='ignore')
366-
367-
# Add medians
308+
309+
# Construct dfGB: merge target results horizontally (one row per group)
310+
dfGB = None
311+
for t_idx, (target_col, results) in enumerate(target_results):
312+
df_t = pd.DataFrame(results)
313+
if df_t.empty:
314+
continue
315+
# Expand coefficients into per-predictor columns for this target
316+
# Expand coefficients into per-predictor columns for this target
317+
if 'coefficients' in df_t.columns:
318+
for idx, pred_col in enumerate(linear_columns):
319+
colname = f"{target_col}_slope_{pred_col}"
320+
df_t[colname] = [
321+
(arr[idx] if isinstance(arr, (np.ndarray, list, tuple)) and len(arr) > idx else np.nan)
322+
for arr in df_t['coefficients']
323+
]
324+
if 'intercept' in df_t.columns:
325+
df_t[f"{target_col}_intercept"] = df_t['intercept']
326+
if 'rms' in df_t.columns:
327+
df_t[f"{target_col}_rms"] = df_t['rms']
328+
if 'mad' in df_t.columns:
329+
df_t[f"{target_col}_mad"] = df_t['mad']
330+
331+
# Drop temp columns; for additional targets keep only gb keys + target-specific cols
332+
drop_cols = ['coefficients', 'intercept', 'rms', 'mad']
333+
if t_idx > 0:
334+
keep_cols = set(gb_columns) | {c for c in df_t.columns if c.startswith(f"{target_col}_")}
335+
df_t = df_t[[c for c in df_t.columns if c in keep_cols]]
336+
df_t = df_t.drop(columns=[c for c in drop_cols if c in df_t.columns], errors='ignore')
337+
338+
if dfGB is None:
339+
dfGB = df_t
340+
else:
341+
dfGB = dfGB.merge(df_t, on=gb_columns, how='left')
342+
343+
if dfGB is None:
344+
dfGB = pd.DataFrame(columns=gb_columns)
345+
346+
# Add medians (per-group)
368347
if median_columns:
369348
median_results = []
370349
for key, idxs in grouped.groups.items():
@@ -374,19 +353,17 @@ def make_parallel_fit_optimized(
374353
median_results.append(group_dict)
375354
df_medians = pd.DataFrame(median_results)
376355
dfGB = dfGB.merge(df_medians, on=gb_columns, how='left')
377-
378-
# Cast dtypes
356+
357+
# Cast dtypes for numeric fit metrics
379358
if cast_dtype:
380359
for col in dfGB.columns:
381360
if any(x in col for x in ['slope', 'intercept', 'rms', 'mad']):
382361
dfGB[col] = dfGB[col].astype(cast_dtype)
383-
384-
# Add suffix
385-
dfGB = dfGB.rename(
386-
columns={col: f"{col}{suffix}" for col in dfGB.columns if col not in gb_columns}
387-
)
388-
389-
# Add predictions
362+
363+
# Add suffix (keep gb_columns unchanged)
364+
dfGB = dfGB.rename(columns={col: f"{col}{suffix}" for col in dfGB.columns if col not in gb_columns})
365+
366+
# Optionally add predictions back to the input df
390367
if addPrediction and not dfGB.empty:
391368
df = df.merge(dfGB, on=gb_columns, how="left")
392369
for target_col in fit_columns:
@@ -398,10 +375,11 @@ def make_parallel_fit_optimized(
398375
slope_col = f"{target_col}_slope_{pred_col}{suffix}"
399376
if slope_col in df.columns:
400377
df[f"{target_col}{suffix}"] += df[slope_col] * df[pred_col]
401-
378+
402379
return df, dfGB
403380

404381

382+
405383
# Convenience wrapper for backward compatibility
406384
def make_parallel_fit_v2(
407385
df: pd.DataFrame,

0 commit comments

Comments
 (0)