@@ -741,145 +741,148 @@ def _ols_kernel_numba_weighted(X_all, Y_all, W_all, offsets, n_groups, n_feat, n
741741 for tt in range (n_tgt ):
742742 out_beta [g , i , tt ] = B [i , tt ]
743743
744-
745744def make_parallel_fit_v4 (
746- df : pd .DataFrame ,
747745 * ,
746+ df ,
748747 gb_columns ,
749748 fit_columns ,
750749 linear_columns ,
751750 median_columns = None ,
752751 weights = None ,
753- suffix : str = "_v4" ,
752+ suffix = "_v4" ,
754753 selection = None ,
755- addPrediction : bool = False ,
756- cast_dtype : str = "float64" ,
757- diag : bool = True ,
758- diag_prefix : str = "diag_" ,
759- min_stat : int = 3 ,
754+ addPrediction = False ,
755+ cast_dtype = "float64" ,
756+ min_stat = 3 ,
757+ diag = False ,
758+ diag_prefix = "diag_" ,
760759):
761760 """
762- Phase 4 — Numba-accelerated per-group ** weighted** OLS .
763- - Same schema and user-facing behavior as v3 (intercept + slopes + optional predictions).
764- - Supports 1 or multi-column group keys .
765- - If Numba is unavailable, falls back to a pure-NumPy weighted loop .
761+ Phase 3 (v4): Numba JIT weighted OLS with *fast* multi-column groupby support .
762+ Key points:
763+ - Group boundaries via vectorized adjacent-row comparisons per key column .
764+ - Vectorized dfGB assembly (no per-group iloc) .
766765 """
767- t0 = time .perf_counter ()
766+ import numpy as np
767+ import pandas as pd
768+
768769 if median_columns is None :
769770 median_columns = []
770- if isinstance (min_stat , (list , tuple )):
771- min_stat = int (np .max (min_stat ))
772771
773- # Selection
774- df_use = df .loc [selection ] if selection is not None else df
775-
776- # Stable sort by group columns to form contiguous blocks
777- sort_keys = gb_columns if isinstance (gb_columns , (list , tuple )) else [gb_columns ]
778- df_sorted = df_use .sort_values (sort_keys , kind = "mergesort" ).reset_index (drop = True )
779-
780- # Build group IDs & offsets for single or multi-key groupby
781- if len (sort_keys ) == 1 :
782- key = sort_keys [0 ]
783- key_vals = df_sorted [key ].to_numpy ()
784- uniq_keys , start_idx = np .unique (key_vals , return_index = True )
785- group_offsets = np .empty (len (uniq_keys ) + 1 , dtype = np .int32 )
786- group_offsets [:- 1 ] = start_idx .astype (np .int32 )
787- group_offsets [- 1 ] = len (df_sorted )
788- n_groups = len (uniq_keys )
789- group_id_rows = {key : uniq_keys }
790- else :
791- # Structured array unique for multi-key grouping
792- rec = df_sorted [sort_keys ].to_records (index = False )
793- uniq_rec , start_idx = np .unique (rec , return_index = True )
794- group_offsets = np .empty (len (uniq_rec ) + 1 , dtype = np .int32 )
795- group_offsets [:- 1 ] = start_idx .astype (np .int32 )
796- group_offsets [- 1 ] = len (df_sorted )
797- n_groups = len (uniq_rec )
798- # Convert structured uniques back into dict of arrays for DataFrame assembly
799- group_id_rows = {name : uniq_rec [name ] for name in uniq_rec .dtype .names }
800-
801- # Flattened matrices
802- X_all = df_sorted [linear_columns ].to_numpy (dtype = np .float64 , copy = False )
803- Y_all = df_sorted [fit_columns ].to_numpy (dtype = np .float64 , copy = False )
804-
805- # Weights: ones if not provided
806- if weights is None :
807- W_all = np .ones (len (df_sorted ), dtype = np .float64 )
808- else :
809- W_all = df_sorted [weights ].to_numpy (dtype = np .float64 , copy = False )
772+ # Filter
773+ if selection is not None :
774+ df = df .loc [selection ]
775+
776+ # Normalize group columns
777+ gb_cols = [gb_columns ] if isinstance (gb_columns , str ) else list (gb_columns )
778+
779+ # Validate columns
780+ needed = set (gb_cols ) | set (linear_columns ) | set (fit_columns )
781+ if weights is not None :
782+ needed .add (weights )
783+ missing = [c for c in needed if c not in df .columns ]
784+ if missing :
785+ raise KeyError (f"Missing required columns: { missing } " )
786+
787+ # Stable sort by all group columns so groups are contiguous
788+ df_sorted = df .sort_values (gb_cols , kind = "mergesort" )
789+
790+ # Dense arrays
791+ dtype_num = np .float64 if cast_dtype is None else cast_dtype
792+ X_all = df_sorted [linear_columns ].to_numpy (dtype = dtype_num , copy = False )
793+ Y_all = df_sorted [fit_columns ].to_numpy (dtype = dtype_num , copy = False )
794+ W_all = (np .ones (len (df_sorted ), dtype = np .float64 ) if weights is None
795+ else df_sorted [weights ].to_numpy (dtype = np .float64 , copy = False ))
796+
797+ N = X_all .shape [0 ]
798+ if N == 0 :
799+ return df_sorted .copy (), pd .DataFrame (columns = gb_cols + [f"n_refits{ suffix } " , f"n_used{ suffix } " , f"frac_rejected{ suffix } " ])
810800
811801 n_feat = X_all .shape [1 ]
812- n_tgt = Y_all .shape [1 ]
802+ n_tgt = Y_all .shape [1 ]
803+
804+ # ---------- FAST multi-column group offsets ----------
805+ # boundaries[0] = True; boundaries[i] = True if any key column changes at i vs i-1
806+ boundaries = np .empty (N , dtype = bool )
807+ boundaries [0 ] = True
808+ if N > 1 :
809+ boundaries [1 :] = False
810+ # OR-adjacent compare for each group column (vectorized)
811+ for col in gb_cols :
812+ a = df_sorted [col ].to_numpy ()
813+ boundaries [1 :] |= (a [1 :] != a [:- 1 ])
814+
815+ starts = np .flatnonzero (boundaries )
816+ offsets = np .empty (len (starts ) + 1 , dtype = np .int64 )
817+ offsets [:- 1 ] = starts
818+ offsets [- 1 ] = N
819+ n_groups = len (starts )
820+ # ----------------------------------------------------
821+
822+ # Allocate beta [n_groups, 1+n_feat, n_tgt]
813823 beta = np .zeros ((n_groups , n_feat + 1 , n_tgt ), dtype = np .float64 )
814824
815- if _NUMBA_OK :
816- _ols_kernel_numba_weighted (
817- X_all , Y_all , W_all , group_offsets .astype (np .int32 ),
818- n_groups , n_feat , n_tgt , int (min_stat ), beta
819- )
820- else :
821- # Pure NumPy fallback (weighted)
822- p = n_feat + 1
823- for g in range (n_groups ):
824- i0 , i1 = group_offsets [g ], group_offsets [g + 1 ]
825+ # Numba kernel (weighted) or NumPy fallback
826+ try :
827+ _ols_kernel_numba_weighted (X_all , Y_all , W_all , offsets , n_groups , n_feat , n_tgt , int (min_stat ), beta )
828+ except NameError :
829+ for gi in range (n_groups ):
830+ i0 , i1 = offsets [gi ], offsets [gi + 1 ]
825831 m = i1 - i0
826- if m < min_stat or m <= n_feat :
832+ if m < int ( min_stat ) :
827833 continue
828834 Xg = X_all [i0 :i1 ]
829835 Yg = Y_all [i0 :i1 ]
830- Wg = W_all [i0 :i1 ] # shape (m,)
831- # Build X1 with intercept
832- X1 = np .c_ [np .ones (m ), Xg ] # (m, p)
833- # Weighted normal equations
834- # XtX = X1^T * W * X1 ; XtY = X1^T * W * Yg
835- XtX = (X1 .T * Wg ).dot (X1 ) # (p,p)
836- XtY = (X1 .T * Wg .reshape (- 1 ,)).dot (Yg ) # (p,n_tgt)
836+ Wg = W_all [i0 :i1 ].reshape (- 1 )
837+ X1 = np .c_ [np .ones (m ), Xg ]
838+ XtX = (X1 .T * Wg ).dot (X1 )
839+ XtY = (X1 .T * Wg ).dot (Yg )
837840 try :
838- B = np .linalg .solve (XtX , XtY )
839- beta [g , :, :] = B
841+ coeffs = np .linalg .solve (XtX , XtY )
842+ beta [gi , :, :] = coeffs
840843 except np .linalg .LinAlgError :
841- # leave zeros for this group
842844 pass
843845
844- # Assemble dfGB (same schema as v3)
845- rows = []
846- for gi in range (n_groups ):
847- row = {}
848- # write group id columns
849- for k , col in enumerate (group_id_rows .keys ()):
850- row [col ] = group_id_rows [col ][gi ]
851- # write coefficients
852- for t_idx , tname in enumerate (fit_columns ):
853- row [f"{ tname } _intercept{ suffix } " ] = beta [gi , 0 , t_idx ]
854- for j , cname in enumerate (linear_columns , start = 1 ):
855- row [f"{ tname } _slope_{ cname } { suffix } " ] = beta [gi , j , t_idx ]
856- rows .append (row )
846+ # ---------- Vectorized dfGB assembly ----------
847+ # Pre-take first-row-of-group keys without iloc in a Python loop
848+ key_arrays = {col : df_sorted [col ].to_numpy ()[starts ] for col in gb_cols }
857849
858- dfGB = pd .DataFrame (rows )
850+ # Diagnostics & coeff arrays
851+ n_refits_arr = np .zeros (n_groups , dtype = np .int32 )
852+ n_used_arr = (offsets [1 :] - offsets [:- 1 ]).astype (np .int32 )
853+ frac_rej_arr = np .zeros (n_groups , dtype = np .float64 )
859854
860- # Diagnostics (minimal; mirrors v3 style)
861- if diag :
862- dfGB [f"{ diag_prefix } wall_ms" ] = (time .perf_counter () - t0 ) * 1e3
863- dfGB [f"{ diag_prefix } n_groups" ] = len (dfGB )
864-
865- # Optional cast
866- if cast_dtype is not None and len (dfGB ):
867- # Don't cast the group key columns
868- safe_keys = sort_keys
869- dfGB = dfGB .astype ({
870- c : cast_dtype
871- for c in dfGB .columns
872- if c not in safe_keys and dfGB [c ].dtype == "float64"
873- })
874-
875- # Optional prediction join
876- df_out = df_use .copy ()
877- if addPrediction and len (dfGB ):
878- df_out = df_out .merge (dfGB , on = sort_keys , how = "left" )
879- for t in fit_columns :
880- pred = df_out [f"{ t } _intercept{ suffix } " ].copy ()
881- for cname in linear_columns :
882- pred += df_out [f"{ t } _slope_{ cname } { suffix } " ] * df_out [cname ]
883- df_out [f"{ t } _pred{ suffix } " ] = pred
855+ out_dict = {col : key_arrays [col ] for col in gb_cols }
856+ out_dict [f"n_refits{ suffix } " ] = n_refits_arr
857+ out_dict [f"n_used{ suffix } " ] = n_used_arr
858+ out_dict [f"frac_rejected{ suffix } " ] = frac_rej_arr
884859
885- return df_out , dfGB
860+ # Intercept + slopes
861+ for t_idx , tname in enumerate (fit_columns ):
862+ out_dict [f"{ tname } _intercept{ suffix } " ] = beta [:, 0 , t_idx ].astype (np .float64 , copy = False )
863+ for j , cname in enumerate (linear_columns , start = 1 ):
864+ out_dict [f"{ tname } _slope_{ cname } { suffix } " ] = beta [:, j , t_idx ].astype (np .float64 , copy = False )
865+
866+ # Optional diag: compute in one pass per group
867+ if diag :
868+ for t_idx , tname in enumerate (fit_columns ):
869+ rms = np .zeros (n_groups , dtype = np .float64 )
870+ mad = np .zeros (n_groups , dtype = np .float64 )
871+ for gi in range (n_groups ):
872+ i0 , i1 = offsets [gi ], offsets [gi + 1 ]
873+ m = i1 - i0
874+ if m == 0 :
875+ continue
876+ Xg = X_all [i0 :i1 ]
877+ y = Y_all [i0 :i1 , t_idx ]
878+ X1 = np .c_ [np .ones (m ), Xg ]
879+ resid = y - (X1 @ beta [gi , :, t_idx ])
880+ rms [gi ] = np .sqrt (np .mean (resid ** 2 ))
881+ mad [gi ] = np .median (np .abs (resid - np .median (resid )))
882+ out_dict [f"{ diag_prefix } { tname } _rms{ suffix } " ] = rms
883+ out_dict [f"{ diag_prefix } { tname } _mad{ suffix } " ] = mad
884+
885+ dfGB = pd .DataFrame (out_dict )
886+ # ---------- end dfGB assembly ----------
887+
888+ return df_sorted , dfGB
0 commit comments