From 9f01c24c3893337b178b0269b4fd7f9f548f0cb7 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Mon, 15 Jun 2026 13:42:45 -0700 Subject: [PATCH 1/5] feat(query): retry 429 and auto-follow truncation --- .openapi-generator-ignore | 8 + CHANGELOG.md | 7 + hotdata/query.py | 439 ++++++++++++++++++++++++++++++++++++++ tests/test_query.py | 364 +++++++++++++++++++++++++++++++ 4 files changed, 818 insertions(+) create mode 100644 hotdata/query.py create mode 100644 tests/test_query.py diff --git a/.openapi-generator-ignore b/.openapi-generator-ignore index 9518392..94ee40c 100644 --- a/.openapi-generator-ignore +++ b/.openapi-generator-ignore @@ -2,7 +2,15 @@ git_push.sh README.md setup.py + +# Hand-written enhancement modules layered over the generated client. They live +# at the package root (outside hotdata/api and hotdata/models) so the generator +# never emits or overwrites them, but they are listed here as the source of +# truth for "hand-maintained, don't touch": _auth.py (JWT exchange), arrow.py +# (Arrow IPC result fetch), query.py (429 retry + truncation auto-follow, #688). hotdata/_auth.py +hotdata/arrow.py +hotdata/query.py # Hand-written test for the patched ApiClient.close()/context-manager behavior # (re-applied by scripts/patch_api_client_close.py). It lives in the generated diff --git a/CHANGELOG.md b/CHANGELOG.md index 51ffefc..a52aa14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `hotdata.query.QueryApi`: enhanced query client that transparently retries + HTTP 429 (`OVERLOADED`) admission shedding honoring `Retry-After`, and + auto-follows truncated results to materialize the full row set with a + configurable `max_auto_rows` guard (#688). + ## [0.3.4] - 2026-06-15 diff --git a/hotdata/query.py b/hotdata/query.py new file mode 100644 index 0000000..8c133ee --- /dev/null +++ b/hotdata/query.py @@ -0,0 +1,439 @@ +"""High-level helpers for ``POST /v1/query`` (#640 / #688). + +The auto-generated :class:`hotdata.api.query_api.QueryApi` speaks the raw +contract; this module wraps it with a thin subclass that absorbs two behaviors +the bounded-memory query contract introduces, transparently: + +* **429 admission-shedding retry.** Under concurrent load the server may shed a + query with HTTP 429 + ``Retry-After`` (error code ``OVERLOADED``; PR #686 + turns the admission cap on by default). :meth:`QueryApi.query` retries + automatically — honoring ``Retry-After`` when present, otherwise bounded + exponential backoff with jitter — under an overall deadline budget. When the + budget or retry count is exhausted it raises :class:`ServerOverloadedError`, + which is distinct from a generic 503 so callers can branch on overload. + +* **Transparent truncation auto-follow.** A large result comes back with + ``truncated=True`` and only a bounded preview in ``rows``; the full result is + persisted out-of-band under ``result_id`` (#640). :meth:`QueryApi.query` + polls that result to ``ready`` and materializes the full row set into the + returned :class:`~hotdata.models.query_response.QueryResponse`. A configurable + ``max_auto_rows`` guard means an unbounded result is never silently pulled + into client memory — past the guard it raises :class:`ResultTooLargeError`, + pointing callers at the streaming Arrow API (:mod:`hotdata.arrow`). + +Use this ``QueryApi`` in place of :class:`hotdata.api.query_api.QueryApi`; every +other method on the base class works unchanged. Auto-follow and retry are both +configurable per-instance (constructor) and per-call (keyword arguments), and +auto-follow can be turned off entirely to get just the preview. +""" + +from __future__ import annotations + +import random +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple, Union + +from hotdata.api.query_api import QueryApi as _GeneratedQueryApi +from hotdata.api.query_runs_api import QueryRunsApi as _QueryRunsApi +from hotdata.api.results_api import ResultsApi as _ResultsApi +from hotdata.exceptions import ApiException +from hotdata.models.query_request import QueryRequest +from hotdata.models.query_response import QueryResponse +from hotdata.models.results_format_query import ResultsFormatQuery + +# HTTP 429: too many concurrent queries (admission shedding). The server tags +# the body with error code OVERLOADED; we key retry off the status code since +# 429 is unambiguous and the body is not always parsed before we see it. +_HTTP_TOO_MANY_REQUESTS = 429 +OVERLOADED_ERROR_CODE = "OVERLOADED" + +# Default ceiling on rows auto-materialized into memory when following a +# truncated result. Mirrors the server's own bounded-memory concern: large +# enough that ordinary results follow transparently, small enough that an +# accidental ``SELECT *`` over a huge table fails loudly instead of OOMing the +# client. Pass ``max_auto_rows=None`` to opt into unbounded materialization. +DEFAULT_MAX_AUTO_ROWS = 1_000_000 + +# Sentinel so a per-call ``max_auto_rows=None`` (opt into unbounded) is +# distinguishable from "not passed, use the instance default". +_UNSET: Any = object() + + +@dataclass(frozen=True) +class RetryPolicy: + """Controls 429 (``OVERLOADED``) retry behavior. + + :param max_retries: Maximum retry attempts after the initial request. + :param base_backoff_s: Base delay for exponential backoff when the server + sends no ``Retry-After`` header. + :param max_backoff_s: Cap on any single backoff delay. + :param deadline_s: Overall budget across all attempts; once exceeded (or a + computed delay would push past it) retries stop. + :param jitter: Fraction of the base delay added as random jitter, to avoid + a thundering herd of retries hitting the freed admission slot at once. + """ + + max_retries: int = 5 + base_backoff_s: float = 0.5 + max_backoff_s: float = 30.0 + deadline_s: float = 120.0 + jitter: float = 0.5 + + +@dataclass(frozen=True) +class PollPolicy: + """Controls result-lifecycle polling and full-result pagination. + + :param base_backoff_s: Initial poll interval; doubles each poll up to + ``max_backoff_s``. + :param max_backoff_s: Cap on the poll interval. + :param deadline_s: Overall budget for a result to reach ``ready``. + :param page_size: Rows fetched per page when paginating the full result. + """ + + base_backoff_s: float = 0.5 + max_backoff_s: float = 5.0 + deadline_s: float = 120.0 + page_size: int = 50_000 + + +class ServerOverloadedError(ApiException): + """Raised when ``POST /v1/query`` is shed with HTTP 429 (``OVERLOADED``, + #686) and the retry budget or attempt count is exhausted. + + Distinct from a generic 503 / ``RESOURCE_EXHAUSTED`` so callers can treat + transient admission shedding differently from a hard resource error. The + ``attempts`` attribute records how many requests were made. + """ + + def __init__(self, *, attempts: int, reason: str, last_exc: ApiException) -> None: + self.attempts = attempts + super().__init__(status=_HTTP_TOO_MANY_REQUESTS, reason=reason) + self.body = getattr(last_exc, "body", None) + self.headers = getattr(last_exc, "headers", None) + + +class ResultFailedError(Exception): + """Raised when a followed result reaches terminal status ``failed``.""" + + def __init__(self, *, result_id: str, error_message: Optional[str]) -> None: + self.result_id = result_id + self.error_message = error_message + super().__init__( + f"Result {result_id} failed" + + (f": {error_message}" if error_message else "") + ) + + +class ResultTimeoutError(Exception): + """Raised when a result does not reach ``ready`` within the poll deadline.""" + + def __init__(self, *, result_id: str, status: str, deadline_s: float) -> None: + self.result_id = result_id + self.status = status + self.deadline_s = deadline_s + super().__init__( + f"Result {result_id} did not become ready within {deadline_s}s " + f"(last status={status!r})." + ) + + +class ResultTooLargeError(Exception): + """Raised when auto-follow would materialize more rows than the guard + allows. Fetch the result incrementally via :mod:`hotdata.arrow` (Arrow + streaming) or raise ``max_auto_rows`` if you truly want it all in memory. + """ + + def __init__(self, *, result_id: str, total: Optional[int], limit: int) -> None: + self.result_id = result_id + self.total = total + self.limit = limit + total_desc = f"{total} rows" if total is not None else "an unknown number of rows" + super().__init__( + f"Result {result_id} has {total_desc}, exceeding the auto-materialize " + f"limit of {limit}. Stream it with hotdata.arrow.ResultsApi, or pass a " + f"higher (or None) max_auto_rows to override." + ) + + +class ResultUnavailableError(Exception): + """Raised when a result is truncated but no ``result_id`` is available to + follow (persistence failed — see the response ``warning`` field). + """ + + def __init__(self, *, warning: Optional[str]) -> None: + self.warning = warning + super().__init__( + "Query result is truncated but no result_id is available to fetch " + "the full result" + + (f": {warning}" if warning else "") + + ". Re-run with auto_follow=False to use the preview." + ) + + +def _parse_retry_after(headers: Any) -> Optional[float]: + """Return the ``Retry-After`` delay in seconds, or ``None``. + + Honors the integer-seconds form (the server sends ``Retry-After: 1``). The + HTTP-date form is not emitted by this API, so it is intentionally ignored + rather than parsed. + """ + if not headers: + return None + # urllib3's HTTPHeaderDict is case-insensitive; a plain dict from a + # hand-built exception may not be, so try the canonical casing too. + value = None + getter = getattr(headers, "get", None) + if callable(getter): + value = headers.get("Retry-After") or headers.get("retry-after") + if value is None: + return None + try: + seconds = float(value) + except (TypeError, ValueError): + return None + return seconds if seconds >= 0 else None + + +class QueryApi(_GeneratedQueryApi): + """Drop-in replacement for :class:`hotdata.api.query_api.QueryApi` that adds + 429 retry and transparent truncation auto-follow. All base-class methods + work unchanged. + """ + + def __init__( + self, + api_client: Any = None, + *, + retry: Optional[RetryPolicy] = None, + poll: Optional[PollPolicy] = None, + auto_follow: bool = True, + max_auto_rows: Optional[int] = DEFAULT_MAX_AUTO_ROWS, + ) -> None: + super().__init__(api_client) + self.retry = retry or RetryPolicy() + self.poll = poll or PollPolicy() + self.auto_follow = auto_follow + self.max_auto_rows = max_auto_rows + + def query( + self, + query_request: QueryRequest, + x_database_id: Optional[str] = None, + _request_timeout: Union[None, float, Tuple[float, float]] = None, + _request_auth: Optional[Dict[str, Any]] = None, + _content_type: Optional[str] = None, + _headers: Optional[Dict[str, Any]] = None, + _host_index: int = 0, + *, + auto_follow: Optional[bool] = None, + max_auto_rows: Any = _UNSET, + ) -> QueryResponse: + """Execute a query, retrying on 429 and auto-following truncation. + + Behaves like the generated :meth:`query` but: + + * retries HTTP 429 (``OVERLOADED``) per the instance :class:`RetryPolicy`; + * when the response is ``truncated`` and ``auto_follow`` is on, polls the + out-of-band result to ``ready`` and replaces ``rows`` with the full + result set (``truncated`` / ``total_row_count`` are left intact so the + caller can still tell it *was* truncated). + + :param auto_follow: Override the instance default for this call. ``False`` + returns the bounded preview unchanged. + :param max_auto_rows: Override the instance row guard for this call. + ``None`` opts into unbounded materialization. + :raises ServerOverloadedError: 429 retries exhausted. + :raises ResultFailedError: the followed result failed. + :raises ResultTimeoutError: the result never became ready. + :raises ResultTooLargeError: the full result exceeds the row guard. + :raises ResultUnavailableError: truncated with no ``result_id`` to follow. + """ + follow = self.auto_follow if auto_follow is None else auto_follow + guard = self.max_auto_rows if max_auto_rows is _UNSET else max_auto_rows + + response = self._query_with_retry( + query_request, + x_database_id, + _request_timeout=_request_timeout, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + if not follow or not response.truncated: + return response + return self._materialize_full(response, guard) + + def wait_for_result( + self, + result_id: str, + *, + results_api: Optional[_ResultsApi] = None, + ) -> Any: + """Poll ``GET /v1/results/{id}`` until the result is ``ready``. + + Returns the ready :class:`~hotdata.models.get_result_response.GetResultResponse`. + Shared by auto-follow and available for direct use. + + :raises ResultFailedError: status reached ``failed``. + :raises ResultTimeoutError: ``ready`` not reached within the poll deadline. + """ + api = results_api or _ResultsApi(self.api_client) + policy = self.poll + deadline = time.monotonic() + policy.deadline_s + delay = policy.base_backoff_s + last_status = "pending" + while True: + result = api.get_result(result_id) + last_status = result.status + if last_status == "ready": + return result + if last_status == "failed": + raise ResultFailedError( + result_id=result_id, error_message=result.error_message + ) + # pending / processing: back off and poll again, within budget. + if time.monotonic() + delay > deadline: + raise ResultTimeoutError( + result_id=result_id, + status=last_status, + deadline_s=policy.deadline_s, + ) + time.sleep(delay) + delay = min(delay * 2, policy.max_backoff_s) + + # -- internals --------------------------------------------------------- + + def _query_with_retry(self, *args: Any, **kwargs: Any) -> QueryResponse: + policy = self.retry + deadline = time.monotonic() + policy.deadline_s + attempts = 0 + while True: + attempts += 1 + try: + return super().query(*args, **kwargs) + except ApiException as exc: + if exc.status != _HTTP_TOO_MANY_REQUESTS: + raise + if attempts > policy.max_retries: + raise ServerOverloadedError( + attempts=attempts, + reason=( + f"server overloaded: {policy.max_retries} retries " + "exhausted (HTTP 429 OVERLOADED)" + ), + last_exc=exc, + ) from exc + delay = self._backoff_delay(exc, attempts) + if time.monotonic() + delay > deadline: + raise ServerOverloadedError( + attempts=attempts, + reason=( + f"server overloaded: retry budget of {policy.deadline_s}s " + "exhausted (HTTP 429 OVERLOADED)" + ), + last_exc=exc, + ) from exc + time.sleep(delay) + + def _backoff_delay(self, exc: ApiException, attempt: int) -> float: + """Delay before the next retry: honor ``Retry-After`` when present, + else exponential backoff. Jitter is always added on top so retries do + not synchronize on the freed admission slot. + """ + policy = self.retry + retry_after = _parse_retry_after(getattr(exc, "headers", None)) + if retry_after is not None: + base = retry_after + else: + base = policy.base_backoff_s * (2 ** (attempt - 1)) + jitter = base * policy.jitter * random.random() + return min(base + jitter, policy.max_backoff_s) + + def _materialize_full( + self, preview: QueryResponse, max_auto_rows: Optional[int] + ) -> QueryResponse: + if preview.result_id is None: + raise ResultUnavailableError(warning=preview.warning) + + results_api = _ResultsApi(self.api_client) + self.wait_for_result(preview.result_id, results_api=results_api) + + total = self._authoritative_total(preview) + if max_auto_rows is not None and total is not None and total > max_auto_rows: + raise ResultTooLargeError( + result_id=preview.result_id, total=total, limit=max_auto_rows + ) + + rows = self._fetch_all_rows( + preview.result_id, total, max_auto_rows, results_api + ) + # Replace the bounded preview with the full row set. truncated / + # total_row_count stay as the server reported them so the caller can + # still see that the inline body had been truncated. + preview.rows = rows + if preview.total_row_count is None and total is not None: + preview.total_row_count = total + return preview + + def _authoritative_total(self, preview: QueryResponse) -> Optional[int]: + """The grand total row count. ``total_row_count`` is null while a + truncated result is still persisting, so fall back to the query-run + record, which carries the authoritative count once the run succeeds. + """ + if preview.total_row_count is not None: + return preview.total_row_count + try: + run = _QueryRunsApi(self.api_client).get_query_run(preview.query_run_id) + except ApiException: + return None + return run.row_count + + def _fetch_all_rows( + self, + result_id: str, + total: Optional[int], + max_auto_rows: Optional[int], + results_api: _ResultsApi, + ) -> List[List[Any]]: + page_size = self.poll.page_size + rows: List[List[Any]] = [] + offset = 0 + while True: + page = results_api.get_result( + result_id, + offset=offset, + limit=page_size, + format=ResultsFormatQuery.JSON, + ) + batch = page.rows or [] + rows.extend(batch) + # Enforce the guard during pagination too, in case the total was + # unknown up front (total_row_count null, query-run lookup failed). + if max_auto_rows is not None and len(rows) > max_auto_rows: + raise ResultTooLargeError( + result_id=result_id, total=total, limit=max_auto_rows + ) + offset += len(batch) + if total is not None and offset >= total: + break + if len(batch) < page_size: + break + return rows + + +__all__ = [ + "DEFAULT_MAX_AUTO_ROWS", + "OVERLOADED_ERROR_CODE", + "PollPolicy", + "QueryApi", + "ResultFailedError", + "ResultTimeoutError", + "ResultTooLargeError", + "ResultUnavailableError", + "RetryPolicy", + "ServerOverloadedError", +] diff --git a/tests/test_query.py b/tests/test_query.py new file mode 100644 index 0000000..2b511af --- /dev/null +++ b/tests/test_query.py @@ -0,0 +1,364 @@ +"""Unit tests for hotdata.query (#688). + +These stub the generated API methods (QueryApi.query, ResultsApi.get_result, +QueryRunsApi.get_query_run) so no server is needed, and patch time.sleep / +random.random so retry/poll timing is deterministic and instant. They cover the +behaviors the bounded-memory query contract requires: + +* 429 (OVERLOADED) retry: honor Retry-After, retry-then-succeed, exhaustion by + max-retries and by deadline budget, and non-429 pass-through. +* Truncation auto-follow: materialize the full row set, fall back to the + query-run total when total_row_count is null-while-persisting, the + max_auto_rows guard, failed results, missing result_id, and the opt-out. +* Forward-compatible deserialization: unknown response fields are ignored. +""" + +from __future__ import annotations + +from contextlib import ExitStack +from types import SimpleNamespace +from typing import Any, List, Optional +from unittest.mock import patch + +import pytest + +from hotdata.api.query_api import QueryApi as _GeneratedQueryApi +from hotdata.api.query_runs_api import QueryRunsApi as _QueryRunsApi +from hotdata.api.results_api import ResultsApi as _ResultsApi +from hotdata.exceptions import ApiException +from hotdata.models.get_result_response import GetResultResponse +from hotdata.models.query_request import QueryRequest +from hotdata.models.query_response import QueryResponse +from hotdata.query import ( + PollPolicy, + QueryApi, + ResultFailedError, + ResultTimeoutError, + ResultTooLargeError, + ResultUnavailableError, + RetryPolicy, + ServerOverloadedError, +) + + +REQ = QueryRequest(sql="SELECT 1 AS x") + + +def _api(**kwargs: Any) -> QueryApi: + # api_client is a dummy: every HTTP method is patched at the class level, so + # the instance's client is never actually exercised. + return QueryApi(api_client=object(), **kwargs) + + +def _preview( + *, + truncated: bool = True, + result_id: Optional[str] = "rslt1", + total: Optional[int] = None, + rows: Optional[List[List[Any]]] = None, +) -> QueryResponse: + rows = rows if rows is not None else [[1]] + return QueryResponse( + columns=["x"], + execution_time_ms=1, + nullable=[False], + preview_row_count=len(rows), + query_run_id="qrun1", + result_id=result_id, + row_count=len(rows), + rows=rows, + total_row_count=total, + truncated=truncated, + warning=None, + ) + + +def _result( + *, + status: str = "ready", + rows: Optional[List[List[Any]]] = None, + error_message: Optional[str] = None, +) -> GetResultResponse: + return GetResultResponse( + result_id="rslt1", status=status, rows=rows, error_message=error_message + ) + + +def _too_many(retry_after: Optional[str] = None) -> ApiException: + exc = ApiException(status=429, reason="Too Many Requests") + exc.headers = {"Retry-After": retry_after} if retry_after is not None else {} + return exc + + +# --- 429 retry ---------------------------------------------------------------- + + +def test_passthrough_when_not_truncated() -> None: + resp = _preview(truncated=False) + with patch.object(_GeneratedQueryApi, "query", return_value=resp) as q, \ + patch.object(_ResultsApi, "get_result") as gr: + out = _api().query(REQ, x_database_id="db1") + assert out is resp + assert q.call_count == 1 + gr.assert_not_called() # no follow when not truncated + + +def test_non_429_error_propagates_unchanged() -> None: + boom = ApiException(status=400, reason="Bad Request") + with patch.object(_GeneratedQueryApi, "query", side_effect=boom), \ + patch("hotdata.query.time.sleep") as sleep: + with pytest.raises(ApiException) as ei: + _api().query(REQ) + assert ei.value.status == 400 + assert not isinstance(ei.value, ServerOverloadedError) + sleep.assert_not_called() + + +def test_429_retry_then_succeed() -> None: + resp = _preview(truncated=False) + side = [_too_many("1"), _too_many("1"), resp] + with patch.object(_GeneratedQueryApi, "query", side_effect=side) as q, \ + patch("hotdata.query.time.sleep") as sleep, \ + patch("hotdata.query.random.random", return_value=0.0): + out = _api(retry=RetryPolicy(max_retries=5)).query(REQ) + assert out is resp + assert q.call_count == 3 + assert sleep.call_count == 2 + + +def test_429_honors_retry_after_header() -> None: + resp = _preview(truncated=False) + with patch.object(_GeneratedQueryApi, "query", side_effect=[_too_many("3"), resp]), \ + patch("hotdata.query.time.sleep") as sleep, \ + patch("hotdata.query.random.random", return_value=0.0): + _api(retry=RetryPolicy()).query(REQ) + # jitter fraction is 0.0 here, so the delay is exactly Retry-After. + sleep.assert_called_once_with(3.0) + + +def test_429_exhausts_max_retries() -> None: + with patch.object(_GeneratedQueryApi, "query", side_effect=_too_many("1")) as q, \ + patch("hotdata.query.time.sleep") as sleep, \ + patch("hotdata.query.random.random", return_value=0.0): + with pytest.raises(ServerOverloadedError) as ei: + _api(retry=RetryPolicy(max_retries=2, deadline_s=1000)).query(REQ) + assert ei.value.status == 429 + assert ei.value.attempts == 3 # initial + 2 retries + assert q.call_count == 3 + assert sleep.call_count == 2 + assert "retries exhausted" in str(ei.value) + + +def test_429_exhausts_deadline_budget() -> None: + with patch.object(_GeneratedQueryApi, "query", side_effect=_too_many("1")) as q, \ + patch("hotdata.query.time.sleep") as sleep, \ + patch("hotdata.query.random.random", return_value=0.0): + with pytest.raises(ServerOverloadedError) as ei: + # A zero budget means the first computed delay already overruns it. + _api(retry=RetryPolicy(max_retries=10, deadline_s=0.0)).query(REQ) + assert ei.value.attempts == 1 + assert q.call_count == 1 + sleep.assert_not_called() + assert "budget" in str(ei.value) + + +# --- truncation auto-follow --------------------------------------------------- + + +def _follow_patches( + stack: ExitStack, + *, + query_response: QueryResponse, + get_result_side_effect: Any, + run_row_count: Optional[int] = None, +) -> None: + stack.enter_context( + patch.object(_GeneratedQueryApi, "query", return_value=query_response) + ) + stack.enter_context( + patch.object(_ResultsApi, "get_result", side_effect=get_result_side_effect) + ) + stack.enter_context( + patch.object( + _QueryRunsApi, + "get_query_run", + return_value=SimpleNamespace(row_count=run_row_count), + ) + ) + stack.enter_context(patch("hotdata.query.time.sleep")) + + +def test_autofollow_materializes_full_rows() -> None: + # Inline preview has 1 row; the full result has 3. total_row_count is null + # while persisting, so the total comes from the query-run record. + preview = _preview(truncated=True, total=None, rows=[[1]]) + full = [[1], [2], [3]] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: # poll: wait_for_result + return _result(status="ready") + # paginate: single page covers all rows + return _result(status="ready", rows=full[offset:offset + limit]) + + with ExitStack() as stack: + _follow_patches( + stack, + query_response=preview, + get_result_side_effect=get_result, + run_row_count=3, + ) + out = _api(poll=PollPolicy(page_size=1000)).query(REQ, x_database_id="db1") + + assert out.rows == full + assert out.total_row_count == 3 # backfilled from the authoritative total + assert out.truncated is True # original server flag preserved + + +def test_autofollow_paginates_across_pages() -> None: + preview = _preview(truncated=True, total=5, rows=[[0]]) + full = [[0], [1], [2], [3], [4]] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: + return _result(status="ready") + return _result(status="ready", rows=full[offset:offset + limit]) + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + out = _api(poll=PollPolicy(page_size=2)).query(REQ) + + assert out.rows == full + + +def test_autofollow_polls_until_ready() -> None: + preview = _preview(truncated=True, total=1, rows=[[1]]) + statuses = iter(["processing", "processing", "ready"]) + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: + return _result(status=next(statuses)) + return _result(status="ready", rows=[[1]]) + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + out = _api(poll=PollPolicy(base_backoff_s=0.01, deadline_s=100)).query(REQ) + + assert out.rows == [[1]] + + +def test_autofollow_disabled_returns_preview() -> None: + preview = _preview(truncated=True, rows=[[1]]) + with patch.object(_GeneratedQueryApi, "query", return_value=preview), \ + patch.object(_ResultsApi, "get_result") as gr: + out = _api().query(REQ, auto_follow=False) + assert out is preview + assert out.truncated is True + gr.assert_not_called() + + +def test_autofollow_failed_result_raises() -> None: + preview = _preview(truncated=True, total=3) + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + return _result(status="failed", error_message="planner exploded") + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + with pytest.raises(ResultFailedError) as ei: + _api().query(REQ) + assert ei.value.result_id == "rslt1" + assert "planner exploded" in str(ei.value) + + +def test_autofollow_guard_rejects_oversized_result() -> None: + preview = _preview(truncated=True, total=None) + calls: List[Any] = [] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + calls.append(format) + return _result(status="ready") # only the poll should run + + with ExitStack() as stack: + _follow_patches( + stack, + query_response=preview, + get_result_side_effect=get_result, + run_row_count=10, + ) + with pytest.raises(ResultTooLargeError) as ei: + _api(max_auto_rows=5).query(REQ) + assert ei.value.total == 10 + assert ei.value.limit == 5 + # Guard trips before any JSON page is fetched (only the readiness poll ran). + assert all(fmt is None for fmt in calls) + + +def test_autofollow_unbounded_when_guard_disabled() -> None: + preview = _preview(truncated=True, total=3) + full = [[1], [2], [3]] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: + return _result(status="ready") + return _result(status="ready", rows=full[offset:offset + limit]) + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + out = _api(max_auto_rows=None, poll=PollPolicy(page_size=1000)).query(REQ) + assert out.rows == full + + +def test_autofollow_missing_result_id_raises() -> None: + preview = _preview(truncated=True, result_id=None) + preview.warning = "result persistence could not be initiated" + with patch.object(_GeneratedQueryApi, "query", return_value=preview): + with pytest.raises(ResultUnavailableError) as ei: + _api().query(REQ) + assert "result persistence could not be initiated" in str(ei.value) + + +def test_wait_for_result_times_out() -> None: + preview = _preview(truncated=True, total=1) + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + return _result(status="processing") + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + with pytest.raises(ResultTimeoutError) as ei: + _api(poll=PollPolicy(base_backoff_s=0.01, deadline_s=0.0)).query(REQ) + assert ei.value.result_id == "rslt1" + assert ei.value.status == "processing" + + +# --- forward-compatible deserialization -------------------------------------- + + +def test_unknown_response_fields_are_ignored() -> None: + # A future additive contract field must not break an older SDK build. + payload = { + "columns": ["x"], + "execution_time_ms": 1, + "nullable": [False], + "preview_row_count": 1, + "query_run_id": "qrun1", + "row_count": 1, + "rows": [[1]], + "truncated": False, + "some_future_field": {"nested": [1, 2, 3]}, + } + resp = QueryResponse.from_dict(payload) + assert resp is not None + assert resp.truncated is False + assert resp.preview_row_count == 1 + assert not hasattr(resp, "some_future_field") From d521b52a5281d238cc8512e38a12a8473408d9d8 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Mon, 15 Jun 2026 15:01:49 -0700 Subject: [PATCH 2/5] feat(sdk): default to enhanced query/results clients --- .github/workflows/regenerate.yml | 3 ++ hotdata/__init__.py | 8 +++++ scripts/patch_query_exports.py | 53 ++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+) create mode 100644 scripts/patch_query_exports.py diff --git a/.github/workflows/regenerate.yml b/.github/workflows/regenerate.yml index 4b6ac1d..c00f8cc 100644 --- a/.github/workflows/regenerate.yml +++ b/.github/workflows/regenerate.yml @@ -206,6 +206,9 @@ jobs: - name: Patch ApiClient close lifecycle run: python3 scripts/patch_api_client_close.py + - name: Patch default client exports (enhanced query/results) + run: python3 scripts/patch_query_exports.py + - name: Verify JWT-exchange code survived regeneration run: | python3 - <<'PY' diff --git a/hotdata/__init__.py b/hotdata/__init__.py index 0f70de1..a2a3804 100644 --- a/hotdata/__init__.py +++ b/hotdata/__init__.py @@ -348,3 +348,11 @@ from hotdata.models.workspace_detail import WorkspaceDetail as WorkspaceDetail from hotdata.models.workspace_list_item import WorkspaceListItem as WorkspaceListItem + +# --- hand-applied: prefer the enhanced clients over the generated ones +# (re-applied by scripts/patch_query_exports.py after regeneration). +# hotdata.query.QueryApi adds 429 retry + truncation auto-follow; +# hotdata.arrow.ResultsApi adds Arrow IPC result fetch. The raw generated +# classes remain importable from hotdata.api.query_api / hotdata.api.results_api. +from hotdata.query import QueryApi as QueryApi # noqa: E402,F811 +from hotdata.arrow import ResultsApi as ResultsApi # noqa: E402,F811 diff --git a/scripts/patch_query_exports.py b/scripts/patch_query_exports.py new file mode 100644 index 0000000..22c534b --- /dev/null +++ b/scripts/patch_query_exports.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +"""Re-apply enhanced default client exports after OpenAPI regeneration. + +The generated ``hotdata/__init__.py`` binds the top-level ``QueryApi`` and +``ResultsApi`` to the raw generated classes. We want ``from hotdata import +QueryApi`` / ``ResultsApi`` to resolve to the hand-written enhanced clients +instead — ``hotdata.query.QueryApi`` (429 retry + truncation auto-follow) and +``hotdata.arrow.ResultsApi`` (Arrow IPC fetch) — so the obvious happy path gets +the safe behavior the #640/#688 query contract needs, rather than the bare +client. The raw generated classes stay importable from ``hotdata.api.*``. + +``hotdata/__init__.py`` is regenerated, so this override is appended (after all +generated imports, so every name it references is defined) and re-applied by +regenerate.yml on each regen. Idempotent: a no-op if already present. +""" + +from __future__ import annotations + +import pathlib +import sys + +ROOT = pathlib.Path(__file__).resolve().parents[1] + +MARKER = "patch_query_exports.py" + +OVERRIDE = ( + "\n\n" + "# --- hand-applied: prefer the enhanced clients over the generated ones\n" + "# (re-applied by scripts/patch_query_exports.py after regeneration).\n" + "# hotdata.query.QueryApi adds 429 retry + truncation auto-follow;\n" + "# hotdata.arrow.ResultsApi adds Arrow IPC result fetch. The raw generated\n" + "# classes remain importable from hotdata.api.query_api / hotdata.api.results_api.\n" + "from hotdata.query import QueryApi as QueryApi # noqa: E402,F811\n" + "from hotdata.arrow import ResultsApi as ResultsApi # noqa: E402,F811\n" +) + + +def patch_init() -> None: + path = ROOT / "hotdata" / "__init__.py" + src = path.read_text() + if MARKER in src: + return + if "import QueryApi as QueryApi" not in src: + sys.exit(f"Failed to patch {path}: generated QueryApi export not found") + path.write_text(src.rstrip("\n") + "\n" + OVERRIDE) + + +def main() -> None: + patch_init() + + +if __name__ == "__main__": + main() From 41e025a011d8133fd3277c0554f2dfe1ed5df1d8 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Mon, 15 Jun 2026 15:01:49 -0700 Subject: [PATCH 3/5] fix(query): fix retry/follow bugs and add byte guard --- CHANGELOG.md | 17 +++- hotdata/query.py | 188 ++++++++++++++++++++++++++++++++++++-------- tests/test_query.py | 120 +++++++++++++++++++++++++++- 3 files changed, 289 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a52aa14..dbcc281 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `hotdata.query.QueryApi`: enhanced query client that transparently retries HTTP 429 (`OVERLOADED`) admission shedding honoring `Retry-After`, and - auto-follows truncated results to materialize the full row set with a - configurable `max_auto_rows` guard (#688). + auto-follows truncated results to materialize the full row set, guarded by + configurable `max_auto_rows` (default 1M) and `max_auto_bytes` (default + 64 MiB) ceilings (#688). +- `ResultError` base class for the result-lifecycle exceptions + (`ResultFailedError`, `ResultTimeoutError`, `ResultTooLargeError`, + `ResultIncompleteError`, `ResultUnavailableError`) so callers can catch them + with a single `except`. + +### Changed + +- `from hotdata import QueryApi` / `ResultsApi` now resolve to the enhanced + clients (429 retry + truncation auto-follow; Arrow IPC fetch) instead of the + bare generated classes, so the default happy path gets the safe behavior the + query contract needs. The raw generated classes remain importable from + `hotdata.api.query_api` / `hotdata.api.results_api`. ## [0.3.4] - 2026-06-15 diff --git a/hotdata/query.py b/hotdata/query.py index 8c133ee..a4c40a0 100644 --- a/hotdata/query.py +++ b/hotdata/query.py @@ -29,6 +29,7 @@ from __future__ import annotations +import logging import random import time from dataclasses import dataclass @@ -48,18 +49,38 @@ _HTTP_TOO_MANY_REQUESTS = 429 OVERLOADED_ERROR_CODE = "OVERLOADED" -# Default ceiling on rows auto-materialized into memory when following a -# truncated result. Mirrors the server's own bounded-memory concern: large -# enough that ordinary results follow transparently, small enough that an -# accidental ``SELECT *`` over a huge table fails loudly instead of OOMing the -# client. Pass ``max_auto_rows=None`` to opt into unbounded materialization. +_log = logging.getLogger("hotdata.query") + +# Default ceilings on what auto-follow will materialize into memory. Mirror the +# server's own bounded-memory concern: large enough that ordinary results follow +# transparently, small enough that an accidental ``SELECT *`` over a huge table +# fails loudly instead of OOMing the client. The byte ceiling is a client-RAM +# guard (an estimate from page content), distinct from the server's 8 MiB +# inline-preview cap. Pass ``max_auto_rows=None`` / ``max_auto_bytes=None`` to +# opt into unbounded materialization on that axis. DEFAULT_MAX_AUTO_ROWS = 1_000_000 +DEFAULT_MAX_AUTO_BYTES = 64 * 1024 * 1024 # Sentinel so a per-call ``max_auto_rows=None`` (opt into unbounded) is # distinguishable from "not passed, use the instance default". _UNSET: Any = object() +def _estimate_rows_bytes(batch: List[List[Any]]) -> int: + """Rough in-memory size of a page of rows, used only for the byte guard. + + Sums stringified cell lengths plus small per-cell/per-row overhead. This is + a conservative ceiling *signal*, not exact accounting — good enough to stop + a wide result before it exhausts RAM, cheap enough to run per page. + """ + total = 0 + for row in batch: + for cell in row: + total += len(str(cell)) + 2 + total += 2 + return total + + @dataclass(frozen=True) class RetryPolicy: """Controls 429 (``OVERLOADED``) retry behavior. @@ -114,7 +135,17 @@ def __init__(self, *, attempts: int, reason: str, last_exc: ApiException) -> Non self.headers = getattr(last_exc, "headers", None) -class ResultFailedError(Exception): +class ResultError(Exception): + """Base class for result-lifecycle errors raised while auto-following a + truncated result. Catch this to handle any of them uniformly. + + Note ``ServerOverloadedError`` is intentionally *not* a ``ResultError`` — it + is an :class:`~hotdata.exceptions.ApiException` raised during query + submission, before any result exists. + """ + + +class ResultFailedError(ResultError): """Raised when a followed result reaches terminal status ``failed``.""" def __init__(self, *, result_id: str, error_message: Optional[str]) -> None: @@ -126,7 +157,7 @@ def __init__(self, *, result_id: str, error_message: Optional[str]) -> None: ) -class ResultTimeoutError(Exception): +class ResultTimeoutError(ResultError): """Raised when a result does not reach ``ready`` within the poll deadline.""" def __init__(self, *, result_id: str, status: str, deadline_s: float) -> None: @@ -139,25 +170,50 @@ def __init__(self, *, result_id: str, status: str, deadline_s: float) -> None: ) -class ResultTooLargeError(Exception): - """Raised when auto-follow would materialize more rows than the guard - allows. Fetch the result incrementally via :mod:`hotdata.arrow` (Arrow - streaming) or raise ``max_auto_rows`` if you truly want it all in memory. +class ResultIncompleteError(ResultError): + """Raised when pagination cannot retrieve the full result — the server + returned no further rows before the known total was reached. Surfaced + instead of silently returning a partial result. + """ + + def __init__(self, *, result_id: str, fetched: int, expected: int) -> None: + self.result_id = result_id + self.fetched = fetched + self.expected = expected + super().__init__( + f"Result {result_id} pagination stalled: fetched {fetched} of " + f"{expected} rows before the server returned an empty page." + ) + + +class ResultTooLargeError(ResultError): + """Raised when auto-follow would materialize more than the guard allows, + on either the row or byte axis. Stream the result instead via + :meth:`hotdata.arrow.ResultsApi.stream_result_arrow`, or raise the guard. + + ``kind`` is ``"rows"`` or ``"bytes"``; ``observed`` is the offending count + (row count or estimated bytes) and ``limit`` the ceiling it exceeded. """ - def __init__(self, *, result_id: str, total: Optional[int], limit: int) -> None: + def __init__(self, *, result_id: str, kind: str, observed: int, limit: int) -> None: self.result_id = result_id - self.total = total + self.kind = kind + self.observed = observed self.limit = limit - total_desc = f"{total} rows" if total is not None else "an unknown number of rows" + if kind == "bytes": + desc = f"~{observed} bytes (limit {limit})" + knob = "max_auto_bytes" + else: + desc = f"{observed} rows (limit {limit})" + knob = "max_auto_rows" super().__init__( - f"Result {result_id} has {total_desc}, exceeding the auto-materialize " - f"limit of {limit}. Stream it with hotdata.arrow.ResultsApi, or pass a " - f"higher (or None) max_auto_rows to override." + f"Result {result_id} exceeds the auto-materialize limit: {desc}. " + f"Stream it with hotdata.arrow.ResultsApi.stream_result_arrow, or " + f"raise (or set to None) {knob} to override." ) -class ResultUnavailableError(Exception): +class ResultUnavailableError(ResultError): """Raised when a result is truncated but no ``result_id`` is available to follow (persistence failed — see the response ``warning`` field). """ @@ -210,12 +266,14 @@ def __init__( poll: Optional[PollPolicy] = None, auto_follow: bool = True, max_auto_rows: Optional[int] = DEFAULT_MAX_AUTO_ROWS, + max_auto_bytes: Optional[int] = DEFAULT_MAX_AUTO_BYTES, ) -> None: super().__init__(api_client) self.retry = retry or RetryPolicy() self.poll = poll or PollPolicy() self.auto_follow = auto_follow self.max_auto_rows = max_auto_rows + self.max_auto_bytes = max_auto_bytes def query( self, @@ -229,6 +287,7 @@ def query( *, auto_follow: Optional[bool] = None, max_auto_rows: Any = _UNSET, + max_auto_bytes: Any = _UNSET, ) -> QueryResponse: """Execute a query, retrying on 429 and auto-following truncation. @@ -240,18 +299,27 @@ def query( result set (``truncated`` / ``total_row_count`` are left intact so the caller can still tell it *was* truncated). + An async submission (``async=true``) returns an ``AsyncQueryResponse``, + which is passed through untouched — there is nothing to follow yet. + :param auto_follow: Override the instance default for this call. ``False`` returns the bounded preview unchanged. :param max_auto_rows: Override the instance row guard for this call. ``None`` opts into unbounded materialization. + :param max_auto_bytes: Override the instance byte guard for this call. + ``None`` opts into unbounded materialization. :raises ServerOverloadedError: 429 retries exhausted. :raises ResultFailedError: the followed result failed. :raises ResultTimeoutError: the result never became ready. - :raises ResultTooLargeError: the full result exceeds the row guard. + :raises ResultTooLargeError: the full result exceeds a guard (rows/bytes). + :raises ResultIncompleteError: pagination could not retrieve all rows. :raises ResultUnavailableError: truncated with no ``result_id`` to follow. """ follow = self.auto_follow if auto_follow is None else auto_follow - guard = self.max_auto_rows if max_auto_rows is _UNSET else max_auto_rows + row_guard = self.max_auto_rows if max_auto_rows is _UNSET else max_auto_rows + byte_guard = ( + self.max_auto_bytes if max_auto_bytes is _UNSET else max_auto_bytes + ) response = self._query_with_retry( query_request, @@ -263,9 +331,13 @@ def query( _host_index=_host_index, ) + # An async submission comes back as AsyncQueryResponse (no `truncated`); + # there is no inline result to follow, so pass it straight through. + if not isinstance(response, QueryResponse): + return response if not follow or not response.truncated: return response - return self._materialize_full(response, guard) + return self._materialize_full(response, row_guard, byte_guard) def wait_for_result( self, @@ -347,14 +419,19 @@ def _backoff_delay(self, exc: ApiException, attempt: int) -> float: policy = self.retry retry_after = _parse_retry_after(getattr(exc, "headers", None)) if retry_after is not None: - base = retry_after - else: - base = policy.base_backoff_s * (2 ** (attempt - 1)) - jitter = base * policy.jitter * random.random() - return min(base + jitter, policy.max_backoff_s) + # The server told us exactly how long to wait — honor it. Add jitter + # on top (never below) to desync retries onto the freed slot, and do + # NOT cap with max_backoff_s: capping would dishonor a Retry-After + # larger than the cap. The overall deadline budget is the only bound. + return retry_after + retry_after * policy.jitter * random.random() + base = policy.base_backoff_s * (2 ** (attempt - 1)) + return min(base + base * policy.jitter * random.random(), policy.max_backoff_s) def _materialize_full( - self, preview: QueryResponse, max_auto_rows: Optional[int] + self, + preview: QueryResponse, + max_auto_rows: Optional[int], + max_auto_bytes: Optional[int], ) -> QueryResponse: if preview.result_id is None: raise ResultUnavailableError(warning=preview.warning) @@ -363,13 +440,25 @@ def _materialize_full( self.wait_for_result(preview.result_id, results_api=results_api) total = self._authoritative_total(preview) + # Auto-follow does extra round-trips (poll + paginate) and materializes + # the full result; log it so the hidden work behind one query() call is + # observable without being noisy (info, not a warning). + _log.info( + "auto-following truncated result %s (%s rows) for query run %s", + preview.result_id, + total if total is not None else "unknown", + preview.query_run_id, + ) if max_auto_rows is not None and total is not None and total > max_auto_rows: raise ResultTooLargeError( - result_id=preview.result_id, total=total, limit=max_auto_rows + result_id=preview.result_id, + kind="rows", + observed=total, + limit=max_auto_rows, ) rows = self._fetch_all_rows( - preview.result_id, total, max_auto_rows, results_api + preview.result_id, total, max_auto_rows, max_auto_bytes, results_api ) # Replace the bounded preview with the full row set. truncated / # total_row_count stay as the server reported them so the caller can @@ -397,10 +486,12 @@ def _fetch_all_rows( result_id: str, total: Optional[int], max_auto_rows: Optional[int], + max_auto_bytes: Optional[int], results_api: _ResultsApi, ) -> List[List[Any]]: page_size = self.poll.page_size rows: List[List[Any]] = [] + byte_estimate = 0 offset = 0 while True: page = results_api.get_result( @@ -410,27 +501,58 @@ def _fetch_all_rows( format=ResultsFormatQuery.JSON, ) batch = page.rows or [] + + # Known total but the server returned nothing more: surface the gap + # rather than silently returning a partial result. + if total is not None and not batch and offset < total: + raise ResultIncompleteError( + result_id=result_id, fetched=offset, expected=total + ) + rows.extend(batch) - # Enforce the guard during pagination too, in case the total was + + # Enforce both guards during pagination, in case the total was # unknown up front (total_row_count null, query-run lookup failed). if max_auto_rows is not None and len(rows) > max_auto_rows: raise ResultTooLargeError( - result_id=result_id, total=total, limit=max_auto_rows + result_id=result_id, + kind="rows", + observed=len(rows), + limit=max_auto_rows, ) + if max_auto_bytes is not None: + byte_estimate += _estimate_rows_bytes(batch) + if byte_estimate > max_auto_bytes: + raise ResultTooLargeError( + result_id=result_id, + kind="bytes", + observed=byte_estimate, + limit=max_auto_bytes, + ) + offset += len(batch) - if total is not None and offset >= total: - break + if total is not None: + # When the total is known, completion is offset >= total; a + # short (non-empty) page just means keep paging. Never stop on + # page size, which is what previously dropped rows. + if offset >= total: + break + continue + # Total unknown: a short/empty page is the end of the stream. if len(batch) < page_size: break return rows __all__ = [ + "DEFAULT_MAX_AUTO_BYTES", "DEFAULT_MAX_AUTO_ROWS", "OVERLOADED_ERROR_CODE", "PollPolicy", "QueryApi", + "ResultError", "ResultFailedError", + "ResultIncompleteError", "ResultTimeoutError", "ResultTooLargeError", "ResultUnavailableError", diff --git a/tests/test_query.py b/tests/test_query.py index 2b511af..8874dd6 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -26,13 +26,16 @@ from hotdata.api.query_runs_api import QueryRunsApi as _QueryRunsApi from hotdata.api.results_api import ResultsApi as _ResultsApi from hotdata.exceptions import ApiException +from hotdata.models.async_query_response import AsyncQueryResponse from hotdata.models.get_result_response import GetResultResponse from hotdata.models.query_request import QueryRequest from hotdata.models.query_response import QueryResponse from hotdata.query import ( PollPolicy, QueryApi, + ResultError, ResultFailedError, + ResultIncompleteError, ResultTimeoutError, ResultTooLargeError, ResultUnavailableError, @@ -293,7 +296,8 @@ def get_result(result_id, offset=None, limit=None, format=None, **kw): ) with pytest.raises(ResultTooLargeError) as ei: _api(max_auto_rows=5).query(REQ) - assert ei.value.total == 10 + assert ei.value.kind == "rows" + assert ei.value.observed == 10 assert ei.value.limit == 5 # Guard trips before any JSON page is fetched (only the readiness poll ran). assert all(fmt is None for fmt in calls) @@ -362,3 +366,117 @@ def test_unknown_response_fields_are_ignored() -> None: assert resp.truncated is False assert resp.preview_row_count == 1 assert not hasattr(resp, "some_future_field") + + +# --- bug fixes ---------------------------------------------------------------- + + +def test_async_response_passes_through_untouched() -> None: + # var_async=True returns AsyncQueryResponse (no .truncated). The enhanced + # query() must return it untouched, not AttributeError on response.truncated. + async_resp = AsyncQueryResponse( + query_run_id="qrun1", status="running", status_url="/v1/query-runs/qrun1" + ) + with patch.object(_GeneratedQueryApi, "query", return_value=async_resp) as q, \ + patch.object(_ResultsApi, "get_result") as gr: + out = _api().query(REQ, x_database_id="db1") + assert out is async_resp + assert q.call_count == 1 + gr.assert_not_called() # no auto-follow attempted on an async submission + + +def test_retry_after_not_capped_by_max_backoff() -> None: + # Retry-After larger than max_backoff_s must be honored, not shrunk to the + # cap — the server told us exactly how long to wait. + resp = _preview(truncated=False) + with patch.object(_GeneratedQueryApi, "query", side_effect=[_too_many("60"), resp]), \ + patch("hotdata.query.time.sleep") as sleep, \ + patch("hotdata.query.random.random", return_value=0.0): + _api(retry=RetryPolicy(max_backoff_s=30.0, deadline_s=1000)).query(REQ) + sleep.assert_called_once_with(60.0) + + +def test_pagination_raises_on_premature_empty_page() -> None: + # total is known (5) but the server returns no more rows after 2 — must + # raise rather than silently returning a partial result. + preview = _preview(truncated=True, total=5, rows=[[0]]) + partial = [[0], [1]] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: + return _result(status="ready") + return _result(status="ready", rows=partial[offset:offset + limit]) + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + with pytest.raises(ResultIncompleteError) as ei: + _api(poll=PollPolicy(page_size=2)).query(REQ) + assert ei.value.expected == 5 + assert ei.value.fetched == 2 + + +# --- byte guard + exception hierarchy ---------------------------------------- + + +def test_byte_guard_trips_before_row_guard() -> None: + # Few rows, but each is wide — the byte guard must catch it even though the + # row count stays well under max_auto_rows. + preview = _preview(truncated=True, total=3, rows=[["x"]]) + wide = [["A" * 10_000], ["B" * 10_000], ["C" * 10_000]] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: + return _result(status="ready") + return _result(status="ready", rows=wide[offset:offset + limit]) + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + with pytest.raises(ResultTooLargeError) as ei: + _api( + max_auto_rows=1_000_000, + max_auto_bytes=15_000, + poll=PollPolicy(page_size=1), + ).query(REQ) + assert ei.value.kind == "bytes" + assert ei.value.limit == 15_000 + + +def test_result_errors_share_a_base() -> None: + # Callers should be able to catch every lifecycle error with one except. + preview = _preview(truncated=True, total=3) + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + return _result(status="failed", error_message="boom") + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + with pytest.raises(ResultError): # ResultFailedError is a ResultError + _api().query(REQ) + assert issubclass(ResultFailedError, ResultError) + assert issubclass(ResultTimeoutError, ResultError) + assert issubclass(ResultTooLargeError, ResultError) + assert issubclass(ResultUnavailableError, ResultError) + assert issubclass(ResultIncompleteError, ResultError) + + +# --- default export ---------------------------------------------------------- + + +def test_enhanced_clients_are_the_default_export() -> None: + # `from hotdata import QueryApi` / `ResultsApi` must resolve to the enhanced + # clients (retry + auto-follow / Arrow), not the bare generated ones, so the + # obvious happy path gets the safe behavior. (Patched by patch_query_exports.) + import hotdata + from hotdata.arrow import ResultsApi as ArrowResultsApi + from hotdata.query import QueryApi as EnhancedQueryApi + + assert hotdata.QueryApi is EnhancedQueryApi + assert hotdata.ResultsApi is ArrowResultsApi + # The raw generated classes stay reachable on their explicit path. + assert _GeneratedQueryApi is not EnhancedQueryApi From 692e46caeff6d3e999313125b5bcb03cd4a58676 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Mon, 15 Jun 2026 15:03:25 -0700 Subject: [PATCH 4/5] ci: run unit tests on every PR --- .github/workflows/integration-tests.yml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index ef49d56..71e3c70 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -57,6 +57,25 @@ jobs: print(f"All {len(scenarios)} scenarios have corresponding test files.") PY + # Unit tests need no server or secrets (they stub the transport / generated + # methods), so they run on every PR including forks. This is where the + # hand-written behavior (Arrow fetch, auth, 429 retry + truncation auto-follow) + # is verified — the integration job only covers tests/integration, and the + # truncation/429 contract can't be exercised against prod until it deploys. + unit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 + with: + python-version: '3.12' + - name: Install package and test deps + run: | + pip install --quiet -r requirements.txt -r test-requirements.txt + pip install --quiet -e . + - name: Run unit tests + run: pytest test tests --ignore=tests/integration -v + # Integration tests run against production. Skipped automatically by the # conftest if HOTDATA_SDK_TEST_API_KEY / HOTDATA_SDK_TEST_WORKSPACE_ID aren't # set (e.g. PRs from forks where secrets aren't injected). From 0de2c6161f2c0c4aaf5d78c9b07a83187cbe4cf4 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Mon, 15 Jun 2026 15:13:38 -0700 Subject: [PATCH 5/5] fix(query): handle 409 failed results, poll status-only --- hotdata/query.py | 23 +++++++++++++++++++++-- tests/test_query.py | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/hotdata/query.py b/hotdata/query.py index a4c40a0..53555a1 100644 --- a/hotdata/query.py +++ b/hotdata/query.py @@ -350,7 +350,12 @@ def wait_for_result( Returns the ready :class:`~hotdata.models.get_result_response.GetResultResponse`. Shared by auto-follow and available for direct use. - :raises ResultFailedError: status reached ``failed``. + Polls with ``limit=0`` so the readiness check fetches status only — a + ``ready`` result would otherwise return its full (unbounded) row set on + every poll, materializing the whole result into memory before the size + guards can act. + + :raises ResultFailedError: the result failed (delivered as HTTP 409). :raises ResultTimeoutError: ``ready`` not reached within the poll deadline. """ api = results_api or _ResultsApi(self.api_client) @@ -359,11 +364,25 @@ def wait_for_result( delay = policy.base_backoff_s last_status = "pending" while True: - result = api.get_result(result_id) + try: + result = api.get_result(result_id, limit=0) + except ApiException as exc: + # A failed result is delivered as HTTP 409: the generated client + # raises (response_deserialize raises on any non-2xx) rather than + # returning status="failed". The GetResultResponse body — and its + # error_message — rides on exc.data. + if exc.status == 409: + data = getattr(exc, "data", None) + raise ResultFailedError( + result_id=result_id, + error_message=getattr(data, "error_message", None), + ) from exc + raise last_status = result.status if last_status == "ready": return result if last_status == "failed": + # Defensive: handle a failure also surfaced via a 2xx body. raise ResultFailedError( result_id=result_id, error_message=result.error_message ) diff --git a/tests/test_query.py b/tests/test_query.py index 8874dd6..1b6b60d 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -93,6 +93,15 @@ def _too_many(retry_after: Optional[str] = None) -> ApiException: return exc +def _failed_409(error_message: str = "boom") -> ApiException: + # A failed result is delivered as HTTP 409: the generated client raises + # ApiException (it raises on any non-2xx), with the GetResultResponse body + # — and its error_message — on exc.data. + exc = ApiException(status=409, reason="Conflict") + exc.data = _result(status="failed", error_message=error_message) + return exc + + # --- 429 retry ---------------------------------------------------------------- @@ -267,7 +276,8 @@ def test_autofollow_failed_result_raises() -> None: preview = _preview(truncated=True, total=3) def get_result(result_id, offset=None, limit=None, format=None, **kw): - return _result(status="failed", error_message="planner exploded") + # The readiness poll hits a failed result -> HTTP 409 ApiException. + raise _failed_409("planner exploded") with ExitStack() as stack: _follow_patches( @@ -276,9 +286,32 @@ def get_result(result_id, offset=None, limit=None, format=None, **kw): with pytest.raises(ResultFailedError) as ei: _api().query(REQ) assert ei.value.result_id == "rslt1" + assert ei.value.error_message == "planner exploded" assert "planner exploded" in str(ei.value) +def test_readiness_poll_fetches_status_only() -> None: + # The poll must not pull the full (unbounded) result into memory before the + # guards run — it asks for limit=0 and gets status only. + preview = _preview(truncated=True, total=2) + full = [[1], [2]] + poll_limits: List[Any] = [] + + def get_result(result_id, offset=None, limit=None, format=None, **kw): + if format is None: # readiness poll + poll_limits.append(limit) + return _result(status="ready") + return _result(status="ready", rows=full[offset:offset + limit]) + + with ExitStack() as stack: + _follow_patches( + stack, query_response=preview, get_result_side_effect=get_result + ) + out = _api(poll=PollPolicy(page_size=10)).query(REQ) + assert out.rows == full + assert poll_limits == [0] # status-only, not the default unbounded fetch + + def test_autofollow_guard_rejects_oversized_result() -> None: preview = _preview(truncated=True, total=None) calls: List[Any] = [] @@ -450,7 +483,7 @@ def test_result_errors_share_a_base() -> None: preview = _preview(truncated=True, total=3) def get_result(result_id, offset=None, limit=None, format=None, **kw): - return _result(status="failed", error_message="boom") + raise _failed_409("boom") with ExitStack() as stack: _follow_patches(