Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,4 @@ Have you had a good experience with this project? Why not share some love and co
We welcome [issue reports](../../issues); be sure to choose the proper issue template for your issue, so that we can be sure you're providing the necessary information.



8 changes: 3 additions & 5 deletions buckaroo/buckaroo_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from buckaroo.extension_utils import copy_extend

from .serialization_utils import EMPTY_DF_WHOLE, check_and_fix_df, pd_to_obj, to_parquet, sd_to_parquet_b64
from .dataflow.dataflow import CustomizableDataflow
from .dataflow.pandas_dataflow import PandasCustomizableDataflow
from .dataflow.dataflow_extras import (Sampling, exception_protect)
from .dataflow.styling_core import (ComponentConfig, DFViewerConfig, DisplayArgs, OverrideColumnConfig, PinnedRowConfig, StylingAnalysis, merge_column_config, EMPTY_DFVIEWER_CONFIG)
from .dataflow.autocleaning import PandasAutocleaning
Expand Down Expand Up @@ -124,16 +124,13 @@ def __init__(self, orig_df, debug=False,
self.record_transcript = record_transcript
self.exception = None
kls = self.__class__
class InnerDataFlow(CustomizableDataflow):
class InnerDataFlow(kls.dataflow_klass):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route _df_to_obj through widget overrides

This refactor removed the InnerDataFlow._df_to_obj bridge, so _df_to_obj implementations on widget subclasses are no longer used during serialization. That breaks built-in override paths like GeopandasBase._df_to_obj (buckaroo/geopandas_buckaroo.py), which exists to coerce geopandas frames before pd_to_obj; after this change, serialization falls back to PandasCustomizableDataflow._df_to_obj and bypasses that conversion, causing geopandas widget payload generation to fail or produce incorrect data.

Useful? React with 👍 / 👎.

sampling_klass = kls.sampling_klass
autocleaning_klass = kls.autocleaning_klass
DFStatsClass = kls.DFStatsClass
autoclean_conf= kls.autoclean_conf
analysis_klasses = kls.analysis_klasses

def _df_to_obj(idfself, df:pd.DataFrame):
return self._df_to_obj(df)

self.dataflow = InnerDataFlow(
orig_df,
debug=debug,column_config_overrides=column_config_overrides,
Expand Down Expand Up @@ -162,6 +159,7 @@ def _df_to_obj(self, df:pd.DataFrame):
render_func_name = Unicode("baked").tag(sync=True)


dataflow_klass = PandasCustomizableDataflow
sampling_klass = PdSampling
autocleaning_klass = PandasAutocleaning #override the base CustomizableDataFlow klass
DFStatsClass = DfStatsV2 # Pandas Specific
Expand Down
60 changes: 24 additions & 36 deletions buckaroo/dataflow/dataflow.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from abc import abstractmethod
from typing import List, Literal, Tuple, Type, TypedDict, Dict as TDict, Any as TAny, Union
from typing_extensions import override
import six
import warnings
import pandas as pd
from traitlets import Unicode, Any, observe, Dict

from buckaroo.pluggable_analysis_framework.col_analysis import ColAnalysis, SDType
from ..serialization_utils import pd_to_obj, sd_to_parquet_b64
from ..serialization_utils import sd_to_parquet_b64
from buckaroo.pluggable_analysis_framework.utils import (filter_analysis)
from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2
from .autocleaning import SentinelAutocleaning
from .dataflow_extras import (exception_protect, Sampling)
from .styling_core import (
Expand Down Expand Up @@ -92,7 +91,7 @@ def __init__(self, raw_df):



def _compute_sampled_df(self, raw_df:pd.DataFrame, sample_method:str):
def _compute_sampled_df(self, raw_df, sample_method):
if sample_method == "first":
return raw_df[:1]
return raw_df
Expand Down Expand Up @@ -185,7 +184,7 @@ def processed_sd(self) -> SDType:
return self.processed_result[1]
return {}

def _get_summary_sd(self, df:pd.DataFrame) -> Tuple[SDType, TAny]:
def _get_summary_sd(self, df) -> Tuple[SDType, TAny]:
analysis_klasses = self.analysis_klasses
if analysis_klasses == "foo":
return {'some-col': {'foo':8}}, {}
Expand Down Expand Up @@ -236,12 +235,15 @@ def _widget_config(self, change):

class CustomizableDataflow(DataFlow):
"""
This allows targetd extension and customization of DataFlow
This allows targetd extension and customization of DataFlow.

This is an abstract base class — use PandasCustomizableDataflow or
PolarsCustomizableDataflow for concrete implementations.
"""
#analysis_klasses = [StylingAnalysis]
analysis_klasses: List[Type[ColAnalysis]] = [StylingAnalysis]
command_config = Dict({}).tag(sync=True)
DFStatsClass = DfStatsV2
DFStatsClass = None
sampling_klass = Sampling

df_display_klasses: TDict[str, Type[StylingAnalysis]] = {}
Expand Down Expand Up @@ -323,7 +325,8 @@ def setup_options_from_analysis(self):
empty_df_display_args[kls.df_display_name] = EMPTY_DF_DISPLAY_ARG


self.DFStatsClass.verify_analysis_objects(self.analysis_klasses)
if self.DFStatsClass is not None:
self.DFStatsClass.verify_analysis_objects(self.analysis_klasses)

self.post_processing_klasses = filter_analysis(self.analysis_klasses, "post_processing_method")

Expand Down Expand Up @@ -379,38 +382,20 @@ def run_code_generator(self, operations):
self.ac_obj.run_code_generator(operations)
### end code interpeter block

@override
def _compute_processed_result(self, cleaned_df:pd.DataFrame, post_processing_method:str) -> Tuple[pd.DataFrame, SDType]:
if post_processing_method == '':
return (cleaned_df, {})
else:
post_analysis = self.post_processing_klasses[post_processing_method]
try:
ret_df, sd = post_analysis.post_process_df(cleaned_df)
return (ret_df, sd)
except Exception as e:
return (self._build_error_dataframe(e), {})
@abstractmethod
def _compute_processed_result(self, cleaned_df, post_processing_method):
...

@abstractmethod
def _build_error_dataframe(self, e):
return pd.DataFrame({'err': [str(e)]})
...


### start summary stats block
#TAny closer to some error type
@override
def _get_summary_sd(self, processed_df:pd.DataFrame) -> Tuple[SDType, TDict[str, TAny]]:
stats = self.DFStatsClass(
processed_df,
self.analysis_klasses,
self.df_name, debug=self.debug)
sdf = stats.sdf
if stats.errs:
if self.debug:
raise Exception("Error executing analysis")
else:
return {}, stats.errs
else:
return sdf, {}
@abstractmethod
def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]:
...


# ### end summary stats block
Expand All @@ -422,13 +407,16 @@ def _sd_to_jsondf(self, sd:SDType):
"""
return sd_to_parquet_b64(sd)

def _df_to_obj(self, df:pd.DataFrame) -> TDict[str, TAny]:
return pd_to_obj(self.sampling_klass.serialize_sample(df))
@abstractmethod
def _df_to_obj(self, df) -> TDict[str, TAny]:
...

def add_analysis(self, analysis_klass:Type[ColAnalysis]) -> None:
"""
same as get_summary_sd, call whatever to set summary_sd and trigger further comps
"""
if self.DFStatsClass is None:
return

stats = self.DFStatsClass(
self.processed_df,
Expand Down
10 changes: 2 additions & 8 deletions buckaroo/dataflow/dataflow_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging


import pandas as pd

logger = logging.getLogger()

Expand All @@ -13,8 +12,6 @@
'summary_stats_key': 'empty'}


SENTINEL_DF_1 = pd.DataFrame({'foo' :[10, 20], 'bar' : ["asdf", "iii"]})
SENTINEL_DF_2 = pd.DataFrame({'col1' :[55, 55], 'col2': ["pppp", "333"]})

SENTINEL_COLUMN_CONFIG_1 = "ASDF"
SENTINEL_COLUMN_CONFIG_2 = "FOO-BAR"
Expand Down Expand Up @@ -65,18 +62,15 @@ def pre_stats_sample(kls, df):
print("Removing excess columns, found %d columns" % len(df.columns))
df = df[df.columns[:kls.max_columns]]
if kls.pre_limit and len(df) > kls.pre_limit:
sampled = df.sample(kls.pre_limit)
if isinstance(sampled, pd.DataFrame):
return sampled.sort_index()
return sampled
return df.sample(kls.pre_limit)
return df


@classmethod
def serialize_sample(kls, df):
if kls.serialize_limit and len(df) > kls.serialize_limit:
sampled = df.sample(kls.serialize_limit)
if isinstance(sampled, pd.DataFrame):
if hasattr(sampled, 'sort_index'):
return sampled.sort_index()
return sampled
return df
Expand Down
50 changes: 50 additions & 0 deletions buckaroo/dataflow/pandas_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Tuple, Dict as TDict, Any as TAny

import pandas as pd
from typing_extensions import override

from buckaroo.pluggable_analysis_framework.col_analysis import SDType
from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2
from ..serialization_utils import pd_to_obj
from .dataflow import CustomizableDataflow


class PandasCustomizableDataflow(CustomizableDataflow):
"""Concrete pandas implementation of CustomizableDataflow."""

DFStatsClass = DfStatsV2

@override
def _compute_processed_result(self, cleaned_df, post_processing_method):
if post_processing_method == '':
return (cleaned_df, {})
else:
post_analysis = self.post_processing_klasses[post_processing_method]
try:
ret_df, sd = post_analysis.post_process_df(cleaned_df)
return (ret_df, sd)
except Exception as e:
return (self._build_error_dataframe(e), {})

@override
def _build_error_dataframe(self, e):
return pd.DataFrame({'err': [str(e)]})

@override
def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]:
stats = self.DFStatsClass(
processed_df,
self.analysis_klasses,
self.df_name, debug=self.debug)
sdf = stats.sdf
if stats.errs:
if self.debug:
raise Exception("Error executing analysis")
else:
return {}, stats.errs
else:
return sdf, {}

@override
def _df_to_obj(self, df) -> TDict[str, TAny]:
return pd_to_obj(self.sampling_klass.serialize_sample(df))
53 changes: 53 additions & 0 deletions buckaroo/dataflow/polars_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Tuple, Dict as TDict, Any as TAny

import pandas as pd
import polars as pl
from typing_extensions import override

from buckaroo.pluggable_analysis_framework.col_analysis import SDType
from buckaroo.pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2
from ..serialization_utils import pd_to_obj
from .dataflow import CustomizableDataflow


class PolarsCustomizableDataflow(CustomizableDataflow):
"""Concrete polars implementation of CustomizableDataflow."""

DFStatsClass = PlDfStatsV2

@override
def _compute_processed_result(self, cleaned_df, post_processing_method):
if post_processing_method == '':
return (cleaned_df, {})
else:
post_analysis = self.post_processing_klasses[post_processing_method]
try:
ret_df, sd = post_analysis.post_process_df(cleaned_df)
return (ret_df, sd)
except Exception as e:
return (self._build_error_dataframe(e), {})

@override
def _build_error_dataframe(self, e):
return pl.DataFrame({'err': [str(e)]})

@override
def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]:
stats = self.DFStatsClass(
processed_df,
self.analysis_klasses,
self.df_name, debug=self.debug)
sdf = stats.sdf
if stats.errs:
if self.debug:
raise Exception("Error executing analysis")
else:
return {}, stats.errs
else:
return sdf, {}

@override
def _df_to_obj(self, df) -> TDict[str, TAny]:
if isinstance(df, pd.DataFrame):
return pd_to_obj(self.sampling_klass.serialize_sample(df))
return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas()))
15 changes: 3 additions & 12 deletions buckaroo/polars_buckaroo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from .pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2
from .pluggable_analysis_framework.polars_analysis_management import PlDfStats
from .customizations.pl_stats_v2 import PL_ANALYSIS_V2
from .serialization_utils import pd_to_obj, sd_to_parquet_b64
from .serialization_utils import sd_to_parquet_b64
from .customizations.styling import DefaultSummaryStatsStyling, DefaultMainStyling
from .customizations.pl_autocleaning_conf import NoCleaningConfPl
from .dataflow.dataflow import Sampling
from .dataflow.polars_dataflow import PolarsCustomizableDataflow
from .dataflow.autocleaning import PandasAutocleaning
from .dataflow.widget_extension_utils import configure_buckaroo

Expand Down Expand Up @@ -50,6 +51,7 @@ def make_origs(raw_df, cleaned_df, cleaning_sd):
class PolarsBuckarooWidget(BuckarooWidget):
"""TODO: Add docstring here
"""
dataflow_klass = PolarsCustomizableDataflow
analysis_klasses = local_analysis_klasses
autocleaning_klass = PandasAutocleaning #override the base CustomizableDataFlow klass
autoclean_conf = tuple([NoCleaningConfPl]) #override the base CustomizableDataFlow conf
Expand All @@ -60,17 +62,6 @@ def _sd_to_jsondf(self, sd):
"""Serialize summary stats dict as parquet-b64."""
return sd_to_parquet_b64(sd)

def _build_error_dataframe(self, e):
return pl.DataFrame({'err': [str(e)]})

def _df_to_obj(self, df):
# I want to this, but then row numbers are lost
#return pd_to_obj(self.sampling_klass.serialize_sample(df).to_pandas())
import pandas as pd
if isinstance(df, pd.DataFrame):
return pd_to_obj(self.sampling_klass.serialize_sample(df))
return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas()))


def prepare_df_for_serialization(df:pl.DataFrame) -> pl.DataFrame:
# I don't like this copy. modify to keep the same data with different names
Expand Down
4 changes: 2 additions & 2 deletions buckaroo/server/data_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from buckaroo.serialization_utils import to_parquet, pd_to_obj, check_and_fix_df
from buckaroo.df_util import old_col_new_col, to_chars

from buckaroo.dataflow.dataflow import CustomizableDataflow
from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow
from buckaroo.dataflow.dataflow_extras import Sampling
from buckaroo.dataflow.autocleaning import PandasAutocleaning
from buckaroo.dataflow.styling_core import StylingAnalysis
Expand Down Expand Up @@ -36,7 +36,7 @@ def pre_stats_sample(kls, df):
return df


class ServerDataflow(CustomizableDataflow):
class ServerDataflow(PandasCustomizableDataflow):
"""Headless dataflow matching BuckarooInfiniteWidget's pipeline."""
sampling_klass = ServerSampling
autocleaning_klass = PandasAutocleaning
Expand Down
4 changes: 2 additions & 2 deletions scripts/smoke_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
def test_base():
"""Bare `pip install buckaroo` — pandas comes via fastparquet."""
import pandas as pd
from buckaroo.dataflow.dataflow import CustomizableDataflow
from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow
from buckaroo.dataflow.autocleaning import PandasAutocleaning
from buckaroo.customizations.pd_autoclean_conf import NoCleaningConf
from buckaroo.serialization_utils import pd_to_obj, to_parquet

df = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})

# Verify the full dataflow pipeline runs
class TestDataflow(CustomizableDataflow):
class TestDataflow(PandasCustomizableDataflow):
autocleaning_klass = PandasAutocleaning
autoclean_conf = tuple([NoCleaningConf])

Expand Down
Loading
Loading