Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions dte_adj/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import numpy as np
from typing import Tuple
from dte_adj.stratified import (
SimpleStratifiedDistributionEstimator,
AdjustedStratifiedDistributionEstimator,
)
from dte_adj.util import compute_ldte, compute_lpte
from dte_adj.util import ArrayLike, compute_ldte, compute_lpte, _convert_to_ndarray


class SimpleLocalDistributionEstimator(SimpleStratifiedDistributionEstimator):
Expand All @@ -28,25 +30,26 @@ def __init__(self):

def fit(
self,
covariates: np.ndarray,
treatment_arms: np.ndarray,
treatment_indicator: np.ndarray,
outcomes: np.ndarray,
strata: np.ndarray,
) -> "SimpleLocalDistributionEstimator":
covariates: ArrayLike,
treatment_arms: ArrayLike,
treatment_indicator: ArrayLike,
outcomes: ArrayLike,
strata: ArrayLike,
) -> SimpleLocalDistributionEstimator:
"""
Train the SimpleLocalDistributionEstimator.

Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): Treatment assignment variable (Z).
treatment_indicator (np.ndarray): Treatment indicator variable (D).
outcomes (np.ndarray): Scalar-valued observed outcome.
strata (np.ndarray): Stratum indicators.
covariates: Pre-treatment covariates.
treatment_arms: Treatment assignment variable (Z).
treatment_indicator: Treatment indicator variable (D).
outcomes: Scalar-valued observed outcome.
strata: Stratum indicators.

Returns:
SimpleLocalDistributionEstimator: The fitted estimator.
"""
treatment_indicator = _convert_to_ndarray(treatment_indicator)
super().fit(covariates, treatment_arms, outcomes, strata)
self.treatment_indicator = treatment_indicator

Expand Down Expand Up @@ -196,25 +199,26 @@ class AdjustedLocalDistributionEstimator(AdjustedStratifiedDistributionEstimator

def fit(
self,
covariates: np.ndarray,
treatment_arms: np.ndarray,
treatment_indicator: np.ndarray,
outcomes: np.ndarray,
strata: np.ndarray,
) -> "AdjustedLocalDistributionEstimator":
covariates: ArrayLike,
treatment_arms: ArrayLike,
treatment_indicator: ArrayLike,
outcomes: ArrayLike,
strata: ArrayLike,
) -> AdjustedLocalDistributionEstimator:
"""
Train the AdjustedLocalDistributionEstimator.

Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): Treatment assignment variable (Z).
treatment_indicator (np.ndarray): Treatment indicator variable (D).
outcomes (np.ndarray): Scalar-valued observed outcome.
strata (np.ndarray): Stratum indicators.
covariates: Pre-treatment covariates.
treatment_arms: Treatment assignment variable (Z).
treatment_indicator: Treatment indicator variable (D).
outcomes: Scalar-valued observed outcome.
strata: Stratum indicators.

Returns:
AdjustedLocalDistributionEstimator: The fitted estimator.
"""
treatment_indicator = _convert_to_ndarray(treatment_indicator)
super().fit(covariates, treatment_arms, outcomes, strata)
self.treatment_indicator = treatment_indicator

Expand Down
31 changes: 21 additions & 10 deletions dte_adj/simple.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import numpy as np
from dte_adj.stratified import (
SimpleStratifiedDistributionEstimator,
AdjustedStratifiedDistributionEstimator,
)
from dte_adj.util import ArrayLike, _convert_to_ndarray


class SimpleDistributionEstimator(SimpleStratifiedDistributionEstimator):
Expand Down Expand Up @@ -45,19 +48,23 @@ def __init__(self):
super().__init__()

def fit(
self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray
) -> "SimpleDistributionEstimator":
self, covariates: ArrayLike, treatment_arms: ArrayLike, outcomes: ArrayLike
) -> SimpleDistributionEstimator:
"""
Set parameters.

Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): The index of the treatment arm.
outcomes (np.ndarray): Scalar-valued observed outcome.
covariates: Pre-treatment covariates.
treatment_arms: The index of the treatment arm.
outcomes: Scalar-valued observed outcome.

Returns:
SimpleDistributionEstimator: The fitted estimator.
"""
covariates = _convert_to_ndarray(covariates)
treatment_arms = _convert_to_ndarray(treatment_arms)
outcomes = _convert_to_ndarray(outcomes)

if covariates.shape[0] != treatment_arms.shape[0]:
raise ValueError("The shape of covariates and treatment_arm should be same")

Expand Down Expand Up @@ -105,19 +112,23 @@ class AdjustedDistributionEstimator(AdjustedStratifiedDistributionEstimator):
"""

def fit(
self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray
) -> "AdjustedDistributionEstimator":
self, covariates: ArrayLike, treatment_arms: ArrayLike, outcomes: ArrayLike
) -> AdjustedDistributionEstimator:
"""
Set parameters.

Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): The index of the treatment arm.
outcomes (np.ndarray): Scalar-valued observed outcome.
covariates: Pre-treatment covariates.
treatment_arms: The index of the treatment arm.
outcomes: Scalar-valued observed outcome.

Returns:
AdjustedDistributionEstimator: The fitted estimator.
"""
covariates = _convert_to_ndarray(covariates)
treatment_arms = _convert_to_ndarray(treatment_arms)
outcomes = _convert_to_ndarray(outcomes)

if covariates.shape[0] != treatment_arms.shape[0]:
raise ValueError("The shape of covariates and treatment_arm should be same")

Expand Down
47 changes: 31 additions & 16 deletions dte_adj/stratified.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
from __future__ import annotations

import numpy as np
from typing import Tuple, Any
from copy import deepcopy
from dte_adj.base import DistributionEstimatorBase
from dte_adj.util import ArrayLike, _convert_to_ndarray


class SimpleStratifiedDistributionEstimator(DistributionEstimatorBase):
"""A class is for estimating the empirical distribution function and computing the Distributional parameters for CAR."""

def fit(
self,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.ndarray,
strata: np.ndarray,
) -> "DistributionEstimatorBase":
covariates: ArrayLike,
treatment_arms: ArrayLike,
outcomes: ArrayLike,
strata: ArrayLike,
) -> DistributionEstimatorBase:
"""
Train the DistributionEstimatorBase.

Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): The index of the treatment arm.
outcomes (np.ndarray): Scalar-valued observed outcome.
covariates: Pre-treatment covariates.
treatment_arms: The index of the treatment arm.
outcomes: Scalar-valued observed outcome.
strata: Stratum indicators.

Returns:
DistributionEstimatorBase: The fitted estimator.
"""
covariates = _convert_to_ndarray(covariates)
treatment_arms = _convert_to_ndarray(treatment_arms)
outcomes = _convert_to_ndarray(outcomes)
strata = _convert_to_ndarray(strata)

if covariates.shape[0] != treatment_arms.shape[0]:
raise ValueError("The shape of covariates and treatment_arm should be same")

Expand Down Expand Up @@ -168,22 +177,28 @@ def __init__(self, base_model: Any, folds=3, is_multi_task=False):

def fit(
self,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.ndarray,
strata: np.ndarray,
) -> "DistributionEstimatorBase":
covariates: ArrayLike,
treatment_arms: ArrayLike,
outcomes: ArrayLike,
strata: ArrayLike,
) -> DistributionEstimatorBase:
"""
Train the DistributionEstimatorBase.

Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): The index of the treatment arm.
outcomes (np.ndarray): Scalar-valued observed outcome.
covariates: Pre-treatment covariates.
treatment_arms: The index of the treatment arm.
outcomes: Scalar-valued observed outcome.
strata: Stratum indicators.

Returns:
DistributionEstimatorBase: The fitted estimator.
"""
covariates = _convert_to_ndarray(covariates)
treatment_arms = _convert_to_ndarray(treatment_arms)
outcomes = _convert_to_ndarray(outcomes)
strata = _convert_to_ndarray(strata)

if covariates.shape[0] != treatment_arms.shape[0]:
raise ValueError("The shape of covariates and treatment_arm should be same")

Expand Down
25 changes: 24 additions & 1 deletion dte_adj/util.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
from __future__ import annotations

import numpy as np
from scipy.stats import norm
from typing import Tuple, TYPE_CHECKING
from typing import Tuple, Union, TYPE_CHECKING

if TYPE_CHECKING:
import pandas as pd
import polars as pl

from dte_adj.local import (
SimpleStratifiedDistributionEstimator,
AdjustedLocalDistributionEstimator,
)

ArrayLike = Union[
np.ndarray,
list,
tuple,
"pd.DataFrame",
"pd.Series",
"pl.DataFrame",
"pl.Series",
]

def _convert_to_ndarray(data: ArrayLike) -> np.ndarray:
"""Convert array-like data to np.ndarray if needed."""
if isinstance(data, np.ndarray):
return data
if hasattr(data, "to_numpy"):
return data.to_numpy()
return np.asarray(data)


def compute_confidence_intervals(
vec_y: np.ndarray,
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ dev = [
"ruff>=0.12.2,<0.16.0",
"sphinx>=7.3.7,<8.2.0",
"scikit-learn>=1.5,<1.9",
"pre-commit>=4.0.1,<4.6.0"
"pre-commit>=4.0.1,<4.6.0",
"pandas>=2.0",
"polars>=1.0"
]

[tool.setuptools.packages.find]
Expand All @@ -47,7 +49,9 @@ dev-dependencies = [
"ruff>=0.12.2,<0.16.0",
"sphinx>=7.3.7,<8.2.0",
"scikit-learn>=1.5,<1.9",
"pre-commit>=4.0.1,<4.6.0"
"pre-commit>=4.0.1,<4.6.0",
"pandas>=2.0",
"polars>=1.0"
]

[tool.ruff.lint]
Expand Down
57 changes: 57 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest
import numpy as np
import pandas as pd
import polars as pl
from dte_adj.util import _convert_to_ndarray


class TestConvertToNdarray(unittest.TestCase):
"""Test that _convert_to_ndarray correctly converts various array-like inputs."""

def test_ndarray(self):
data = np.array([1, 2, 3])
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, data)

def test_ndarray_2d(self):
data = np.array([[1, 2], [3, 4]])
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, data)

def test_pandas_series(self):
data = pd.Series([1, 2, 3])
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, np.array([1, 2, 3]))

def test_pandas_dataframe(self):
data = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, np.array([[1, 3], [2, 4]]))

def test_polars_series(self):
data = pl.Series([1, 2, 3])
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, np.array([1, 2, 3]))

def test_polars_dataframe(self):
data = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, np.array([[1, 3], [2, 4]]))

def test_list(self):
data = [1, 2, 3]
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, np.array([1, 2, 3]))

def test_tuple(self):
data = (1, 2, 3)
result = _convert_to_ndarray(data)
self.assertIsInstance(result, np.ndarray)
np.testing.assert_array_equal(result, np.array([1, 2, 3]))
Loading