From 2db1042a8783ad7cfbf126b0d6076aeff9524197 Mon Sep 17 00:00:00 2001 From: Paddy Mullen Date: Mon, 23 Feb 2026 07:15:02 -0500 Subject: [PATCH 1/2] refactor: abstract CustomizableDataflow into pandas/polars subclasses Make CustomizableDataflow an abstract base class with concrete PandasCustomizableDataflow and PolarsCustomizableDataflow subclasses. Widgets select their backend via a dataflow_klass attribute. This decouples pandas from the core dataflow module, enabling pandas to become an optional dependency in the future. Co-Authored-By: Claude Opus 4.6 --- buckaroo/buckaroo_widget.py | 8 +-- buckaroo/dataflow/dataflow.py | 60 ++++++++----------- buckaroo/dataflow/dataflow_extras.py | 10 +--- buckaroo/dataflow/pandas_dataflow.py | 50 ++++++++++++++++ buckaroo/dataflow/polars_dataflow.py | 53 ++++++++++++++++ buckaroo/polars_buckaroo.py | 15 +---- buckaroo/server/data_loading.py | 4 +- scripts/smoke_test.py | 4 +- tests/unit/dataflow/autocleaning_pd_test.py | 4 +- .../dataflow/customizable_dataflow_test.py | 5 +- 10 files changed, 144 insertions(+), 69 deletions(-) create mode 100644 buckaroo/dataflow/pandas_dataflow.py create mode 100644 buckaroo/dataflow/polars_dataflow.py diff --git a/buckaroo/buckaroo_widget.py b/buckaroo/buckaroo_widget.py index 64d26d4ed..69828b366 100644 --- a/buckaroo/buckaroo_widget.py +++ b/buckaroo/buckaroo_widget.py @@ -27,7 +27,7 @@ from buckaroo.extension_utils import copy_extend from .serialization_utils import EMPTY_DF_WHOLE, check_and_fix_df, pd_to_obj, to_parquet, sd_to_parquet_b64 -from .dataflow.dataflow import CustomizableDataflow +from .dataflow.pandas_dataflow import PandasCustomizableDataflow from .dataflow.dataflow_extras import (Sampling, exception_protect) from .dataflow.styling_core import (ComponentConfig, DFViewerConfig, DisplayArgs, OverrideColumnConfig, PinnedRowConfig, StylingAnalysis, merge_column_config, EMPTY_DFVIEWER_CONFIG) from .dataflow.autocleaning import PandasAutocleaning @@ -124,16 +124,13 @@ def __init__(self, orig_df, debug=False, self.record_transcript = record_transcript self.exception = None kls = self.__class__ - class InnerDataFlow(CustomizableDataflow): + class InnerDataFlow(kls.dataflow_klass): sampling_klass = kls.sampling_klass autocleaning_klass = kls.autocleaning_klass DFStatsClass = kls.DFStatsClass autoclean_conf= kls.autoclean_conf analysis_klasses = kls.analysis_klasses - def _df_to_obj(idfself, df:pd.DataFrame): - return self._df_to_obj(df) - self.dataflow = InnerDataFlow( orig_df, debug=debug,column_config_overrides=column_config_overrides, @@ -162,6 +159,7 @@ def _df_to_obj(self, df:pd.DataFrame): render_func_name = Unicode("baked").tag(sync=True) + dataflow_klass = PandasCustomizableDataflow sampling_klass = PdSampling autocleaning_klass = PandasAutocleaning #override the base CustomizableDataFlow klass DFStatsClass = DfStatsV2 # Pandas Specific diff --git a/buckaroo/dataflow/dataflow.py b/buckaroo/dataflow/dataflow.py index a84ab870d..c1f901629 100644 --- a/buckaroo/dataflow/dataflow.py +++ b/buckaroo/dataflow/dataflow.py @@ -1,14 +1,13 @@ +from abc import abstractmethod from typing import List, Literal, Tuple, Type, TypedDict, Dict as TDict, Any as TAny, Union from typing_extensions import override import six import warnings -import pandas as pd from traitlets import Unicode, Any, observe, Dict from buckaroo.pluggable_analysis_framework.col_analysis import ColAnalysis, SDType -from ..serialization_utils import pd_to_obj, sd_to_parquet_b64 +from ..serialization_utils import sd_to_parquet_b64 from buckaroo.pluggable_analysis_framework.utils import (filter_analysis) -from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2 from .autocleaning import SentinelAutocleaning from .dataflow_extras import (exception_protect, Sampling) from .styling_core import ( @@ -92,7 +91,7 @@ def __init__(self, raw_df): - def _compute_sampled_df(self, raw_df:pd.DataFrame, sample_method:str): + def _compute_sampled_df(self, raw_df, sample_method): if sample_method == "first": return raw_df[:1] return raw_df @@ -185,7 +184,7 @@ def processed_sd(self) -> SDType: return self.processed_result[1] return {} - def _get_summary_sd(self, df:pd.DataFrame) -> Tuple[SDType, TAny]: + def _get_summary_sd(self, df) -> Tuple[SDType, TAny]: analysis_klasses = self.analysis_klasses if analysis_klasses == "foo": return {'some-col': {'foo':8}}, {} @@ -236,12 +235,15 @@ def _widget_config(self, change): class CustomizableDataflow(DataFlow): """ - This allows targetd extension and customization of DataFlow + This allows targetd extension and customization of DataFlow. + + This is an abstract base class — use PandasCustomizableDataflow or + PolarsCustomizableDataflow for concrete implementations. """ #analysis_klasses = [StylingAnalysis] analysis_klasses: List[Type[ColAnalysis]] = [StylingAnalysis] command_config = Dict({}).tag(sync=True) - DFStatsClass = DfStatsV2 + DFStatsClass = None sampling_klass = Sampling df_display_klasses: TDict[str, Type[StylingAnalysis]] = {} @@ -323,7 +325,8 @@ def setup_options_from_analysis(self): empty_df_display_args[kls.df_display_name] = EMPTY_DF_DISPLAY_ARG - self.DFStatsClass.verify_analysis_objects(self.analysis_klasses) + if self.DFStatsClass is not None: + self.DFStatsClass.verify_analysis_objects(self.analysis_klasses) self.post_processing_klasses = filter_analysis(self.analysis_klasses, "post_processing_method") @@ -379,38 +382,20 @@ def run_code_generator(self, operations): self.ac_obj.run_code_generator(operations) ### end code interpeter block - @override - def _compute_processed_result(self, cleaned_df:pd.DataFrame, post_processing_method:str) -> Tuple[pd.DataFrame, SDType]: - if post_processing_method == '': - return (cleaned_df, {}) - else: - post_analysis = self.post_processing_klasses[post_processing_method] - try: - ret_df, sd = post_analysis.post_process_df(cleaned_df) - return (ret_df, sd) - except Exception as e: - return (self._build_error_dataframe(e), {}) + @abstractmethod + def _compute_processed_result(self, cleaned_df, post_processing_method): + ... + @abstractmethod def _build_error_dataframe(self, e): - return pd.DataFrame({'err': [str(e)]}) + ... ### start summary stats block #TAny closer to some error type - @override - def _get_summary_sd(self, processed_df:pd.DataFrame) -> Tuple[SDType, TDict[str, TAny]]: - stats = self.DFStatsClass( - processed_df, - self.analysis_klasses, - self.df_name, debug=self.debug) - sdf = stats.sdf - if stats.errs: - if self.debug: - raise Exception("Error executing analysis") - else: - return {}, stats.errs - else: - return sdf, {} + @abstractmethod + def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]: + ... # ### end summary stats block @@ -422,13 +407,16 @@ def _sd_to_jsondf(self, sd:SDType): """ return sd_to_parquet_b64(sd) - def _df_to_obj(self, df:pd.DataFrame) -> TDict[str, TAny]: - return pd_to_obj(self.sampling_klass.serialize_sample(df)) + @abstractmethod + def _df_to_obj(self, df) -> TDict[str, TAny]: + ... def add_analysis(self, analysis_klass:Type[ColAnalysis]) -> None: """ same as get_summary_sd, call whatever to set summary_sd and trigger further comps """ + if self.DFStatsClass is None: + return stats = self.DFStatsClass( self.processed_df, diff --git a/buckaroo/dataflow/dataflow_extras.py b/buckaroo/dataflow/dataflow_extras.py index 1e1ebcb22..06f2d05b2 100644 --- a/buckaroo/dataflow/dataflow_extras.py +++ b/buckaroo/dataflow/dataflow_extras.py @@ -2,7 +2,6 @@ import logging -import pandas as pd logger = logging.getLogger() @@ -13,8 +12,6 @@ 'summary_stats_key': 'empty'} -SENTINEL_DF_1 = pd.DataFrame({'foo' :[10, 20], 'bar' : ["asdf", "iii"]}) -SENTINEL_DF_2 = pd.DataFrame({'col1' :[55, 55], 'col2': ["pppp", "333"]}) SENTINEL_COLUMN_CONFIG_1 = "ASDF" SENTINEL_COLUMN_CONFIG_2 = "FOO-BAR" @@ -65,10 +62,7 @@ def pre_stats_sample(kls, df): print("Removing excess columns, found %d columns" % len(df.columns)) df = df[df.columns[:kls.max_columns]] if kls.pre_limit and len(df) > kls.pre_limit: - sampled = df.sample(kls.pre_limit) - if isinstance(sampled, pd.DataFrame): - return sampled.sort_index() - return sampled + return df.sample(kls.pre_limit) return df @@ -76,7 +70,7 @@ def pre_stats_sample(kls, df): def serialize_sample(kls, df): if kls.serialize_limit and len(df) > kls.serialize_limit: sampled = df.sample(kls.serialize_limit) - if isinstance(sampled, pd.DataFrame): + if hasattr(sampled, 'sort_index'): return sampled.sort_index() return sampled return df diff --git a/buckaroo/dataflow/pandas_dataflow.py b/buckaroo/dataflow/pandas_dataflow.py new file mode 100644 index 000000000..f79f403ac --- /dev/null +++ b/buckaroo/dataflow/pandas_dataflow.py @@ -0,0 +1,50 @@ +from typing import Tuple, Dict as TDict, Any as TAny + +import pandas as pd +from typing_extensions import override + +from buckaroo.pluggable_analysis_framework.col_analysis import SDType +from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2 +from ..serialization_utils import pd_to_obj +from .dataflow import CustomizableDataflow + + +class PandasCustomizableDataflow(CustomizableDataflow): + """Concrete pandas implementation of CustomizableDataflow.""" + + DFStatsClass = DfStatsV2 + + @override + def _compute_processed_result(self, cleaned_df, post_processing_method): + if post_processing_method == '': + return (cleaned_df, {}) + else: + post_analysis = self.post_processing_klasses[post_processing_method] + try: + ret_df, sd = post_analysis.post_process_df(cleaned_df) + return (ret_df, sd) + except Exception as e: + return (self._build_error_dataframe(e), {}) + + @override + def _build_error_dataframe(self, e): + return pd.DataFrame({'err': [str(e)]}) + + @override + def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]: + stats = self.DFStatsClass( + processed_df, + self.analysis_klasses, + self.df_name, debug=self.debug) + sdf = stats.sdf + if stats.errs: + if self.debug: + raise Exception("Error executing analysis") + else: + return {}, stats.errs + else: + return sdf, {} + + @override + def _df_to_obj(self, df) -> TDict[str, TAny]: + return pd_to_obj(self.sampling_klass.serialize_sample(df)) diff --git a/buckaroo/dataflow/polars_dataflow.py b/buckaroo/dataflow/polars_dataflow.py new file mode 100644 index 000000000..0a024de26 --- /dev/null +++ b/buckaroo/dataflow/polars_dataflow.py @@ -0,0 +1,53 @@ +from typing import Tuple, Dict as TDict, Any as TAny + +import pandas as pd +import polars as pl +from typing_extensions import override + +from buckaroo.pluggable_analysis_framework.col_analysis import SDType +from buckaroo.pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2 +from ..serialization_utils import pd_to_obj +from .dataflow import CustomizableDataflow + + +class PolarsCustomizableDataflow(CustomizableDataflow): + """Concrete polars implementation of CustomizableDataflow.""" + + DFStatsClass = PlDfStatsV2 + + @override + def _compute_processed_result(self, cleaned_df, post_processing_method): + if post_processing_method == '': + return (cleaned_df, {}) + else: + post_analysis = self.post_processing_klasses[post_processing_method] + try: + ret_df, sd = post_analysis.post_process_df(cleaned_df) + return (ret_df, sd) + except Exception as e: + return (self._build_error_dataframe(e), {}) + + @override + def _build_error_dataframe(self, e): + return pl.DataFrame({'err': [str(e)]}) + + @override + def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]: + stats = self.DFStatsClass( + processed_df, + self.analysis_klasses, + self.df_name, debug=self.debug) + sdf = stats.sdf + if stats.errs: + if self.debug: + raise Exception("Error executing analysis") + else: + return {}, stats.errs + else: + return sdf, {} + + @override + def _df_to_obj(self, df) -> TDict[str, TAny]: + if isinstance(df, pd.DataFrame): + return pd_to_obj(self.sampling_klass.serialize_sample(df)) + return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas())) diff --git a/buckaroo/polars_buckaroo.py b/buckaroo/polars_buckaroo.py index e5b0442bc..6cb4ac9b5 100644 --- a/buckaroo/polars_buckaroo.py +++ b/buckaroo/polars_buckaroo.py @@ -9,10 +9,11 @@ from .pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2 from .pluggable_analysis_framework.polars_analysis_management import PlDfStats from .customizations.pl_stats_v2 import PL_ANALYSIS_V2 -from .serialization_utils import pd_to_obj, sd_to_parquet_b64 +from .serialization_utils import sd_to_parquet_b64 from .customizations.styling import DefaultSummaryStatsStyling, DefaultMainStyling from .customizations.pl_autocleaning_conf import NoCleaningConfPl from .dataflow.dataflow import Sampling +from .dataflow.polars_dataflow import PolarsCustomizableDataflow from .dataflow.autocleaning import PandasAutocleaning from .dataflow.widget_extension_utils import configure_buckaroo @@ -50,6 +51,7 @@ def make_origs(raw_df, cleaned_df, cleaning_sd): class PolarsBuckarooWidget(BuckarooWidget): """TODO: Add docstring here """ + dataflow_klass = PolarsCustomizableDataflow analysis_klasses = local_analysis_klasses autocleaning_klass = PandasAutocleaning #override the base CustomizableDataFlow klass autoclean_conf = tuple([NoCleaningConfPl]) #override the base CustomizableDataFlow conf @@ -60,17 +62,6 @@ def _sd_to_jsondf(self, sd): """Serialize summary stats dict as parquet-b64.""" return sd_to_parquet_b64(sd) - def _build_error_dataframe(self, e): - return pl.DataFrame({'err': [str(e)]}) - - def _df_to_obj(self, df): - # I want to this, but then row numbers are lost - #return pd_to_obj(self.sampling_klass.serialize_sample(df).to_pandas()) - import pandas as pd - if isinstance(df, pd.DataFrame): - return pd_to_obj(self.sampling_klass.serialize_sample(df)) - return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas())) - def prepare_df_for_serialization(df:pl.DataFrame) -> pl.DataFrame: # I don't like this copy. modify to keep the same data with different names diff --git a/buckaroo/server/data_loading.py b/buckaroo/server/data_loading.py index d85fd4725..6f9db2a7f 100644 --- a/buckaroo/server/data_loading.py +++ b/buckaroo/server/data_loading.py @@ -6,7 +6,7 @@ from buckaroo.serialization_utils import to_parquet, pd_to_obj, check_and_fix_df from buckaroo.df_util import old_col_new_col, to_chars -from buckaroo.dataflow.dataflow import CustomizableDataflow +from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow from buckaroo.dataflow.dataflow_extras import Sampling from buckaroo.dataflow.autocleaning import PandasAutocleaning from buckaroo.dataflow.styling_core import StylingAnalysis @@ -36,7 +36,7 @@ def pre_stats_sample(kls, df): return df -class ServerDataflow(CustomizableDataflow): +class ServerDataflow(PandasCustomizableDataflow): """Headless dataflow matching BuckarooInfiniteWidget's pipeline.""" sampling_klass = ServerSampling autocleaning_klass = PandasAutocleaning diff --git a/scripts/smoke_test.py b/scripts/smoke_test.py index 63a206651..c398bd2f2 100644 --- a/scripts/smoke_test.py +++ b/scripts/smoke_test.py @@ -15,7 +15,7 @@ def test_base(): """Bare `pip install buckaroo` — pandas comes via fastparquet.""" import pandas as pd - from buckaroo.dataflow.dataflow import CustomizableDataflow + from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow from buckaroo.dataflow.autocleaning import PandasAutocleaning from buckaroo.customizations.pd_autoclean_conf import NoCleaningConf from buckaroo.serialization_utils import pd_to_obj, to_parquet @@ -23,7 +23,7 @@ def test_base(): df = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]}) # Verify the full dataflow pipeline runs - class TestDataflow(CustomizableDataflow): + class TestDataflow(PandasCustomizableDataflow): autocleaning_klass = PandasAutocleaning autoclean_conf = tuple([NoCleaningConf]) diff --git a/tests/unit/dataflow/autocleaning_pd_test.py b/tests/unit/dataflow/autocleaning_pd_test.py index 158b2a5ff..fc48ac6c9 100644 --- a/tests/unit/dataflow/autocleaning_pd_test.py +++ b/tests/unit/dataflow/autocleaning_pd_test.py @@ -13,7 +13,7 @@ SafeInt, DropCol, FillNA, GroupBy, NoOp, Search, OnlyOutliers ) from buckaroo.customizations.pd_autoclean_conf import (NoCleaningConf) -from buckaroo.dataflow.dataflow import CustomizableDataflow +from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow dirty_df = pd.DataFrame( {'a':[10, 20, 30, 40, 10, 20.3, 5, None, None, None], @@ -500,7 +500,7 @@ def test_autoclean_dataflow(): """ verify that different autocleaning confs are actually called """ - class SentinelDataflow(CustomizableDataflow): + class SentinelDataflow(PandasCustomizableDataflow): autocleaning_klass = PandasAutocleaning autoclean_conf = tuple([SentinelConfig, NoCleaningConf]) diff --git a/tests/unit/dataflow/customizable_dataflow_test.py b/tests/unit/dataflow/customizable_dataflow_test.py index 26a77c738..2ae51fe7e 100644 --- a/tests/unit/dataflow/customizable_dataflow_test.py +++ b/tests/unit/dataflow/customizable_dataflow_test.py @@ -2,7 +2,8 @@ import pytest from ..fixtures import (DistinctCount) from buckaroo.pluggable_analysis_framework.col_analysis import (ColAnalysis) -from buckaroo.dataflow.dataflow import CustomizableDataflow, StylingAnalysis +from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow +from buckaroo.dataflow.dataflow import StylingAnalysis from buckaroo.buckaroo_widget import BuckarooWidget, BuckarooInfiniteWidget from buckaroo.jlisp.lisp_utils import (s, sQ) from buckaroo.dataflow.autocleaning import PandasAutocleaning @@ -44,7 +45,7 @@ 'extra_grid_config': {}, } -class ACDFC(CustomizableDataflow): +class ACDFC(PandasCustomizableDataflow): autocleaning_klass = PandasAutocleaning autoclean_conf = tuple([NoCleaningConf]) From 02ecf492b07f08b8e03b727666d872e5c45b9cd5 Mon Sep 17 00:00:00 2001 From: Paddy Mullen Date: Tue, 24 Feb 2026 08:52:36 -0500 Subject: [PATCH 2/2] simple commit so openAI does a review --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index a26bf3f8a..2a11ab6d9 100644 --- a/README.md +++ b/README.md @@ -276,3 +276,4 @@ Have you had a good experience with this project? Why not share some love and co We welcome [issue reports](../../issues); be sure to choose the proper issue template for your issue, so that we can be sure you're providing the necessary information. +