diff --git a/bigframes/core/col.py b/bigframes/core/col.py index 60b24d5e83..d00d61365a 100644 --- a/bigframes/core/col.py +++ b/bigframes/core/col.py @@ -18,8 +18,10 @@ import bigframes_vendored.pandas.core.col as pd_col +from bigframes.core import agg_expressions, window_spec import bigframes.core.expression as bf_expression import bigframes.operations as bf_ops +import bigframes.operations.aggregations as agg_ops # Not to be confused with the Expression class in `bigframes.core.expressions` @@ -33,6 +35,15 @@ class Expression: def _apply_unary(self, op: bf_ops.UnaryOp) -> Expression: return Expression(op.as_expr(self._value)) + def _apply_unary_agg(self, op: agg_ops.UnaryAggregateOp) -> Expression: + # We probably shouldn't need to windowize here, but block apis expect pre-windowized expressions + # Later on, we will probably have col expressions in windowed context, so will need to defer windowization + # instead of automatically applying the default unbound window + agg_expr = op.as_expr(self._value) + return Expression( + agg_expressions.WindowExpression(agg_expr, window_spec.unbound()) + ) + def _apply_binary(self, other: Any, op: bf_ops.BinaryOp, reverse: bool = False): if isinstance(other, Expression): other_value = other._value @@ -118,6 +129,24 @@ def __rxor__(self, other: Any) -> Expression: def __invert__(self) -> Expression: return self._apply_unary(bf_ops.invert_op) + def sum(self) -> Expression: + return self._apply_unary_agg(agg_ops.sum_op) + + def mean(self) -> Expression: + return self._apply_unary_agg(agg_ops.mean_op) + + def var(self) -> Expression: + return self._apply_unary_agg(agg_ops.var_op) + + def std(self) -> Expression: + return self._apply_unary_agg(agg_ops.std_op) + + def min(self) -> Expression: + return self._apply_unary_agg(agg_ops.min_op) + + def max(self) -> Expression: + return self._apply_unary_agg(agg_ops.max_op) + def col(col_name: Hashable) -> Expression: return Expression(bf_expression.free_var(col_name)) diff --git a/tests/unit/test_col.py b/tests/unit/test_col.py index 9c9088e037..c7a7eaa326 100644 --- a/tests/unit/test_col.py +++ b/tests/unit/test_col.py @@ -100,6 +100,57 @@ def test_pd_col_unary_operators(scalars_dfs, op): assert_frame_equal(bf_result, pd_result) +@pytest.mark.parametrize( + ("op"), + [ + (lambda x: x.sum()), + (lambda x: x.mean()), + (lambda x: x.min()), + (lambda x: x.max()), + (lambda x: x.std()), + (lambda x: x.var()), + ], + ids=[ + "sum", + "mean", + "min", + "max", + "std", + "var", + ], +) +def test_pd_col_aggregate_op(scalars_dfs, op): + scalars_df, scalars_pandas_df = scalars_dfs + bf_kwargs = { + "result": op(bpd.col("float64_col")), + } + pd_kwargs = { + "result": op(pd.col("float64_col")), # type: ignore + } + df = scalars_df.assign(**bf_kwargs) + + bf_result = df.to_pandas() + pd_result = scalars_pandas_df.assign(**pd_kwargs) + + assert_frame_equal(bf_result, pd_result) + + +def test_pd_col_aggregate_of_aggregate(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_kwargs = { + "result": (bpd.col("int64_col") - bpd.col("int64_col").mean()).mean(), + } + pd_kwargs = { + "result": (pd.col("int64_col") - pd.col("int64_col").mean()).mean(), # type: ignore + } + df = scalars_df.assign(**bf_kwargs) + + bf_result = df.to_pandas() + pd_result = scalars_pandas_df.assign(**pd_kwargs) + + assert_frame_equal(bf_result, pd_result) + + @pytest.mark.parametrize( ("op",), [