Skip to content

Add hydro_list_to_flat_df() helper for converting HydroServer .list() results into pandas DataFrames#27

Open
amabdallah wants to merge 4 commits intohydroserver2:mainfrom
amabdallah:main
Open

Add hydro_list_to_flat_df() helper for converting HydroServer .list() results into pandas DataFrames#27
amabdallah wants to merge 4 commits intohydroserver2:mainfrom
amabdallah:main

Conversation

@amabdallah
Copy link

Summary

This PR adds a pandas-friendly helper, hydro_list_to_flat_df(), for converting HydroServer .list() responses into flat DataFrames.

The helper supports:

  • automatic pagination
  • optional flattening of dict-valued columns
  • optional numeric casting
  • selective default sorting for Things and Datastreams
  • fallback retry without sorting when an endpoint rejects order_by

This is intended to improve notebook, QA/QC, ETL, and analytics workflows while preserving the existing object-based API.

Changes

  • added hydro_list_to_flat_df() to src/hydroserverpy/api/utils.py
  • exported the helper from src/hydroserverpy/__init__.py
  • added tests for basic conversion, flattening, numeric casting, and manual pagination

Notes

  • dict flattening is currently one level deep
  • list-valued fields remain as lists
  • pandas is imported inside the helper so DataFrame support remains optional for users who do not need it

@amabdallah
Copy link
Author

Here is the inspirational script for this pull request

import pandas as pd
from uuid import UUID
import inspect
import requests


"""
MAIN FUNCTION OVERVIEW

This function retrieves data from HydroServer and converts it into a clean,
analysis-ready pandas DataFrame. It automatically fetches all available records
(across pages when supported), converts API objects into flat row-based data,
expands nested metadata fields (such as tags or properties) into regular columns,
and converts numeric values stored as text into proper numeric types.

Sorting behavior (important):
- order_by=["name"] is applied by default ONLY for Things and Datastreams.
- For all other endpoints, no default sorting is applied unless you explicitly pass order_by=...
- If sorting is attempted and the endpoint rejects the order_by value (422 validation error),
  the function retries once with no sorting (order_by=None). No alternative sort fields
  (e.g., "code") are substituted automatically.

In simple terms:
This function turns raw HydroServer API responses into a clean,
spreadsheet-ready table suitable for reporting, QA checks, merging,
and downstream data workflows.
"""


"""
LIMITATIONS / KNOWN BEHAVIOR:

- Dictionary-type columns (e.g., tags, properties) are flattened only one level deep.
  Deeply nested dictionaries may not be fully expanded.

- If two different dictionary fields contain the same key name
  (e.g., both 'tags' and 'properties' contain "Status"),
  one column may overwrite the other. Collision handling is not enforced.

- Automatic numeric casting converts columns that are mostly numeric.
  If a column contains mixed text and numeric values,
  it may remain as text.

- List-type fields (arrays) are not exploded into multiple rows.
  They remain as lists within a single cell.

- Sorting fallback logic only applies when sorting is attempted
  (either by default for Things/Datastreams or when explicitly provided).

- The function assumes consistent API schema behavior across pages.
  If HydroServer schema changes significantly,
  column structure or data types may shift.
"""


def _to_serializable(v):
    if isinstance(v, UUID):
        return str(v)
    return v


def _item_to_dict(item):
    row = {}
    for attr, value in vars(item).items():
        if attr.startswith("_"):
            continue
        row[attr] = _to_serializable(value)
    return row


def _flatten_all_dict_columns(df):
    df_out = df.copy()

    for col in df_out.columns:
        if df_out[col].apply(lambda x: isinstance(x, dict)).any():
            mask = df_out[col].apply(lambda x: isinstance(x, dict))
            expanded = pd.json_normalize(df_out.loc[mask, col])
            expanded.index = df_out.loc[mask].index
            df_out = df_out.drop(columns=[col]).join(expanded, how="left")

    return df_out


def _auto_cast_numeric_columns(df):
    for col in df.columns:
        if df[col].dtype == "object":
            converted = pd.to_numeric(df[col], errors="coerce")
            non_null = df[col].notna().sum()
            numeric_success = converted.notna().sum()
            if non_null > 0 and numeric_success / non_null > 0.9:
                df[col] = converted
    return df


def hydro_list_to_flat_df(
    list_fn,
    fetch_all=True,
    flatten_dicts=True,
    order_by=None,
    auto_cast_numeric=True,
    # Apply default sort ONLY to these endpoints (detected from list_fn.__qualname__/__name__/repr)
    default_sort_endpoints=("things", "datastreams"),
    default_order_by_for_sorted_endpoints=("name",),
    **list_kwargs
):
    """
    HydroServer .list() -> DataFrame, with selective default sorting.

    Default sorting:
      - If order_by is NOT provided, the function applies order_by=["name"] ONLY when
        list_fn belongs to Things or Datastreams endpoints.
      - Otherwise it does not pass any order_by (unsorted) unless you specify it.

    Fallback:
      - If sorting is attempted and order_by triggers a 422 validation error,
        the function retries once with no sorting (order_by=None).
    """

    # -----------------------------
    # Decide whether to apply default order_by
    # -----------------------------
    if order_by is None:
        fn_id = (
            getattr(list_fn, "__qualname__", "") + " " +
            getattr(list_fn, "__name__", "") + " " +
            repr(list_fn)
        ).lower()

        should_default_sort = any(ep in fn_id for ep in default_sort_endpoints)

        if should_default_sort:
            order_by = [default_order_by_for_sorted_endpoints[0]]
        else:
            order_by = None

    # Build candidate order_by values to try:
    # - If sorting is requested (explicitly or by default), try it first,
    #   then retry once with no sorting.
    # - If no sorting is requested, only try no sorting.
    if order_by is not None:
        candidates = [order_by, None]
    else:
        candidates = [None]

    items = []
    collection = None
    last_err = None

    def _call_list(*, ob, **kwargs):
        call_kwargs = dict(kwargs)
        if ob is not None:
            call_kwargs["order_by"] = ob
        return list_fn(**call_kwargs)

    # --------------------------------------
    # Step 1: Fetch data from HydroServer
    # --------------------------------------
    if fetch_all:
        sig = inspect.signature(list_fn)
        supports_fetch_all = "fetch_all" in sig.parameters

        if supports_fetch_all:
            for ob in candidates:
                try:
                    collection = _call_list(ob=ob, fetch_all=True, **list_kwargs)
                    items = collection.items
                    break
                except requests.HTTPError as e:
                    last_err = e
                    resp = getattr(e, "response", None)
                    # Only fallback on validation-style errors (422). Otherwise re-raise.
                    if resp is None or resp.status_code != 422:
                        raise

            if collection is None:
                raise last_err

        else:
            # Manual pagination (also respects the same sort/no-sort candidates)
            for ob in candidates:
                try:
                    page = 1
                    items = []

                    while True:
                        collection = _call_list(ob=ob, page=page, **list_kwargs)
                        items.extend(collection.items)

                        total_pages = getattr(collection, "total_pages", None)
                        if total_pages and page >= total_pages:
                            break

                        page_size = getattr(collection, "page_size", None)
                        if total_pages is None and page_size and len(collection.items) < page_size:
                            break

                        page += 1

                    # Success for this candidate (ob)
                    break

                except requests.HTTPError as e:
                    last_err = e
                    resp = getattr(e, "response", None)
                    if resp is None or resp.status_code != 422:
                        raise

            if collection is None and last_err is not None:
                raise last_err

    else:
        # Single page only, still with fallback ordering
        for ob in candidates:
            try:
                collection = _call_list(ob=ob, **list_kwargs)
                items = collection.items
                break
            except requests.HTTPError as e:
                last_err = e
                resp = getattr(e, "response", None)
                if resp is None or resp.status_code != 422:
                    raise

        if collection is None:
            raise last_err

    # --------------------------------------
    # Step 2: Convert objects → DataFrame
    # --------------------------------------
    df = pd.DataFrame([_item_to_dict(i) for i in items])

    # --------------------------------------
    # Step 3: Expand nested dictionaries
    # --------------------------------------
    if flatten_dicts and not df.empty:
        df = _flatten_all_dict_columns(df)

    # --------------------------------------
    # Step 4: Convert numeric-like columns
    # --------------------------------------
    if auto_cast_numeric and not df.empty:
        df = _auto_cast_numeric_columns(df)

    return df


def check_for_duplicates(df):
    """
    Checks whether the DataFrame contains duplicate rows.

    Returns:
        "FYI Duplicates Found in the DF" if duplicates exist
        None if no duplicates are found
    """
    if df.duplicated().any():
        return "FYI Duplicates Found in the DF"
    return None


# Example showing how to use it:
    """
df_things = hydro_list_to_flat_df(hs_api.things.list)  # default sorts by name (Things)
message = check_for_duplicates(df_things)
if message:
    print(message)

df_units = hydro_list_to_flat_df(hs_api.units.list)    # no default sorting (Units)
    """
    
    
    

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant