Skip to content

Commit 41e025a

Browse files
committed
fix(query): fix retry/follow bugs and add byte guard
1 parent d521b52 commit 41e025a

3 files changed

Lines changed: 289 additions & 36 deletions

File tree

CHANGELOG.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- `hotdata.query.QueryApi`: enhanced query client that transparently retries
1313
HTTP 429 (`OVERLOADED`) admission shedding honoring `Retry-After`, and
14-
auto-follows truncated results to materialize the full row set with a
15-
configurable `max_auto_rows` guard (#688).
14+
auto-follows truncated results to materialize the full row set, guarded by
15+
configurable `max_auto_rows` (default 1M) and `max_auto_bytes` (default
16+
64 MiB) ceilings (#688).
17+
- `ResultError` base class for the result-lifecycle exceptions
18+
(`ResultFailedError`, `ResultTimeoutError`, `ResultTooLargeError`,
19+
`ResultIncompleteError`, `ResultUnavailableError`) so callers can catch them
20+
with a single `except`.
21+
22+
### Changed
23+
24+
- `from hotdata import QueryApi` / `ResultsApi` now resolve to the enhanced
25+
clients (429 retry + truncation auto-follow; Arrow IPC fetch) instead of the
26+
bare generated classes, so the default happy path gets the safe behavior the
27+
query contract needs. The raw generated classes remain importable from
28+
`hotdata.api.query_api` / `hotdata.api.results_api`.
1629

1730

1831
## [0.3.4] - 2026-06-15

hotdata/query.py

Lines changed: 155 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
from __future__ import annotations
3131

32+
import logging
3233
import random
3334
import time
3435
from dataclasses import dataclass
@@ -48,18 +49,38 @@
4849
_HTTP_TOO_MANY_REQUESTS = 429
4950
OVERLOADED_ERROR_CODE = "OVERLOADED"
5051

51-
# Default ceiling on rows auto-materialized into memory when following a
52-
# truncated result. Mirrors the server's own bounded-memory concern: large
53-
# enough that ordinary results follow transparently, small enough that an
54-
# accidental ``SELECT *`` over a huge table fails loudly instead of OOMing the
55-
# client. Pass ``max_auto_rows=None`` to opt into unbounded materialization.
52+
_log = logging.getLogger("hotdata.query")
53+
54+
# Default ceilings on what auto-follow will materialize into memory. Mirror the
55+
# server's own bounded-memory concern: large enough that ordinary results follow
56+
# transparently, small enough that an accidental ``SELECT *`` over a huge table
57+
# fails loudly instead of OOMing the client. The byte ceiling is a client-RAM
58+
# guard (an estimate from page content), distinct from the server's 8 MiB
59+
# inline-preview cap. Pass ``max_auto_rows=None`` / ``max_auto_bytes=None`` to
60+
# opt into unbounded materialization on that axis.
5661
DEFAULT_MAX_AUTO_ROWS = 1_000_000
62+
DEFAULT_MAX_AUTO_BYTES = 64 * 1024 * 1024
5763

5864
# Sentinel so a per-call ``max_auto_rows=None`` (opt into unbounded) is
5965
# distinguishable from "not passed, use the instance default".
6066
_UNSET: Any = object()
6167

6268

69+
def _estimate_rows_bytes(batch: List[List[Any]]) -> int:
70+
"""Rough in-memory size of a page of rows, used only for the byte guard.
71+
72+
Sums stringified cell lengths plus small per-cell/per-row overhead. This is
73+
a conservative ceiling *signal*, not exact accounting — good enough to stop
74+
a wide result before it exhausts RAM, cheap enough to run per page.
75+
"""
76+
total = 0
77+
for row in batch:
78+
for cell in row:
79+
total += len(str(cell)) + 2
80+
total += 2
81+
return total
82+
83+
6384
@dataclass(frozen=True)
6485
class RetryPolicy:
6586
"""Controls 429 (``OVERLOADED``) retry behavior.
@@ -114,7 +135,17 @@ def __init__(self, *, attempts: int, reason: str, last_exc: ApiException) -> Non
114135
self.headers = getattr(last_exc, "headers", None)
115136

116137

117-
class ResultFailedError(Exception):
138+
class ResultError(Exception):
139+
"""Base class for result-lifecycle errors raised while auto-following a
140+
truncated result. Catch this to handle any of them uniformly.
141+
142+
Note ``ServerOverloadedError`` is intentionally *not* a ``ResultError`` — it
143+
is an :class:`~hotdata.exceptions.ApiException` raised during query
144+
submission, before any result exists.
145+
"""
146+
147+
148+
class ResultFailedError(ResultError):
118149
"""Raised when a followed result reaches terminal status ``failed``."""
119150

120151
def __init__(self, *, result_id: str, error_message: Optional[str]) -> None:
@@ -126,7 +157,7 @@ def __init__(self, *, result_id: str, error_message: Optional[str]) -> None:
126157
)
127158

128159

129-
class ResultTimeoutError(Exception):
160+
class ResultTimeoutError(ResultError):
130161
"""Raised when a result does not reach ``ready`` within the poll deadline."""
131162

132163
def __init__(self, *, result_id: str, status: str, deadline_s: float) -> None:
@@ -139,25 +170,50 @@ def __init__(self, *, result_id: str, status: str, deadline_s: float) -> None:
139170
)
140171

141172

142-
class ResultTooLargeError(Exception):
143-
"""Raised when auto-follow would materialize more rows than the guard
144-
allows. Fetch the result incrementally via :mod:`hotdata.arrow` (Arrow
145-
streaming) or raise ``max_auto_rows`` if you truly want it all in memory.
173+
class ResultIncompleteError(ResultError):
174+
"""Raised when pagination cannot retrieve the full result — the server
175+
returned no further rows before the known total was reached. Surfaced
176+
instead of silently returning a partial result.
177+
"""
178+
179+
def __init__(self, *, result_id: str, fetched: int, expected: int) -> None:
180+
self.result_id = result_id
181+
self.fetched = fetched
182+
self.expected = expected
183+
super().__init__(
184+
f"Result {result_id} pagination stalled: fetched {fetched} of "
185+
f"{expected} rows before the server returned an empty page."
186+
)
187+
188+
189+
class ResultTooLargeError(ResultError):
190+
"""Raised when auto-follow would materialize more than the guard allows,
191+
on either the row or byte axis. Stream the result instead via
192+
:meth:`hotdata.arrow.ResultsApi.stream_result_arrow`, or raise the guard.
193+
194+
``kind`` is ``"rows"`` or ``"bytes"``; ``observed`` is the offending count
195+
(row count or estimated bytes) and ``limit`` the ceiling it exceeded.
146196
"""
147197

148-
def __init__(self, *, result_id: str, total: Optional[int], limit: int) -> None:
198+
def __init__(self, *, result_id: str, kind: str, observed: int, limit: int) -> None:
149199
self.result_id = result_id
150-
self.total = total
200+
self.kind = kind
201+
self.observed = observed
151202
self.limit = limit
152-
total_desc = f"{total} rows" if total is not None else "an unknown number of rows"
203+
if kind == "bytes":
204+
desc = f"~{observed} bytes (limit {limit})"
205+
knob = "max_auto_bytes"
206+
else:
207+
desc = f"{observed} rows (limit {limit})"
208+
knob = "max_auto_rows"
153209
super().__init__(
154-
f"Result {result_id} has {total_desc}, exceeding the auto-materialize "
155-
f"limit of {limit}. Stream it with hotdata.arrow.ResultsApi, or pass a "
156-
f"higher (or None) max_auto_rows to override."
210+
f"Result {result_id} exceeds the auto-materialize limit: {desc}. "
211+
f"Stream it with hotdata.arrow.ResultsApi.stream_result_arrow, or "
212+
f"raise (or set to None) {knob} to override."
157213
)
158214

159215

160-
class ResultUnavailableError(Exception):
216+
class ResultUnavailableError(ResultError):
161217
"""Raised when a result is truncated but no ``result_id`` is available to
162218
follow (persistence failed — see the response ``warning`` field).
163219
"""
@@ -210,12 +266,14 @@ def __init__(
210266
poll: Optional[PollPolicy] = None,
211267
auto_follow: bool = True,
212268
max_auto_rows: Optional[int] = DEFAULT_MAX_AUTO_ROWS,
269+
max_auto_bytes: Optional[int] = DEFAULT_MAX_AUTO_BYTES,
213270
) -> None:
214271
super().__init__(api_client)
215272
self.retry = retry or RetryPolicy()
216273
self.poll = poll or PollPolicy()
217274
self.auto_follow = auto_follow
218275
self.max_auto_rows = max_auto_rows
276+
self.max_auto_bytes = max_auto_bytes
219277

220278
def query(
221279
self,
@@ -229,6 +287,7 @@ def query(
229287
*,
230288
auto_follow: Optional[bool] = None,
231289
max_auto_rows: Any = _UNSET,
290+
max_auto_bytes: Any = _UNSET,
232291
) -> QueryResponse:
233292
"""Execute a query, retrying on 429 and auto-following truncation.
234293
@@ -240,18 +299,27 @@ def query(
240299
result set (``truncated`` / ``total_row_count`` are left intact so the
241300
caller can still tell it *was* truncated).
242301
302+
An async submission (``async=true``) returns an ``AsyncQueryResponse``,
303+
which is passed through untouched — there is nothing to follow yet.
304+
243305
:param auto_follow: Override the instance default for this call. ``False``
244306
returns the bounded preview unchanged.
245307
:param max_auto_rows: Override the instance row guard for this call.
246308
``None`` opts into unbounded materialization.
309+
:param max_auto_bytes: Override the instance byte guard for this call.
310+
``None`` opts into unbounded materialization.
247311
:raises ServerOverloadedError: 429 retries exhausted.
248312
:raises ResultFailedError: the followed result failed.
249313
:raises ResultTimeoutError: the result never became ready.
250-
:raises ResultTooLargeError: the full result exceeds the row guard.
314+
:raises ResultTooLargeError: the full result exceeds a guard (rows/bytes).
315+
:raises ResultIncompleteError: pagination could not retrieve all rows.
251316
:raises ResultUnavailableError: truncated with no ``result_id`` to follow.
252317
"""
253318
follow = self.auto_follow if auto_follow is None else auto_follow
254-
guard = self.max_auto_rows if max_auto_rows is _UNSET else max_auto_rows
319+
row_guard = self.max_auto_rows if max_auto_rows is _UNSET else max_auto_rows
320+
byte_guard = (
321+
self.max_auto_bytes if max_auto_bytes is _UNSET else max_auto_bytes
322+
)
255323

256324
response = self._query_with_retry(
257325
query_request,
@@ -263,9 +331,13 @@ def query(
263331
_host_index=_host_index,
264332
)
265333

334+
# An async submission comes back as AsyncQueryResponse (no `truncated`);
335+
# there is no inline result to follow, so pass it straight through.
336+
if not isinstance(response, QueryResponse):
337+
return response
266338
if not follow or not response.truncated:
267339
return response
268-
return self._materialize_full(response, guard)
340+
return self._materialize_full(response, row_guard, byte_guard)
269341

270342
def wait_for_result(
271343
self,
@@ -347,14 +419,19 @@ def _backoff_delay(self, exc: ApiException, attempt: int) -> float:
347419
policy = self.retry
348420
retry_after = _parse_retry_after(getattr(exc, "headers", None))
349421
if retry_after is not None:
350-
base = retry_after
351-
else:
352-
base = policy.base_backoff_s * (2 ** (attempt - 1))
353-
jitter = base * policy.jitter * random.random()
354-
return min(base + jitter, policy.max_backoff_s)
422+
# The server told us exactly how long to wait — honor it. Add jitter
423+
# on top (never below) to desync retries onto the freed slot, and do
424+
# NOT cap with max_backoff_s: capping would dishonor a Retry-After
425+
# larger than the cap. The overall deadline budget is the only bound.
426+
return retry_after + retry_after * policy.jitter * random.random()
427+
base = policy.base_backoff_s * (2 ** (attempt - 1))
428+
return min(base + base * policy.jitter * random.random(), policy.max_backoff_s)
355429

356430
def _materialize_full(
357-
self, preview: QueryResponse, max_auto_rows: Optional[int]
431+
self,
432+
preview: QueryResponse,
433+
max_auto_rows: Optional[int],
434+
max_auto_bytes: Optional[int],
358435
) -> QueryResponse:
359436
if preview.result_id is None:
360437
raise ResultUnavailableError(warning=preview.warning)
@@ -363,13 +440,25 @@ def _materialize_full(
363440
self.wait_for_result(preview.result_id, results_api=results_api)
364441

365442
total = self._authoritative_total(preview)
443+
# Auto-follow does extra round-trips (poll + paginate) and materializes
444+
# the full result; log it so the hidden work behind one query() call is
445+
# observable without being noisy (info, not a warning).
446+
_log.info(
447+
"auto-following truncated result %s (%s rows) for query run %s",
448+
preview.result_id,
449+
total if total is not None else "unknown",
450+
preview.query_run_id,
451+
)
366452
if max_auto_rows is not None and total is not None and total > max_auto_rows:
367453
raise ResultTooLargeError(
368-
result_id=preview.result_id, total=total, limit=max_auto_rows
454+
result_id=preview.result_id,
455+
kind="rows",
456+
observed=total,
457+
limit=max_auto_rows,
369458
)
370459

371460
rows = self._fetch_all_rows(
372-
preview.result_id, total, max_auto_rows, results_api
461+
preview.result_id, total, max_auto_rows, max_auto_bytes, results_api
373462
)
374463
# Replace the bounded preview with the full row set. truncated /
375464
# total_row_count stay as the server reported them so the caller can
@@ -397,10 +486,12 @@ def _fetch_all_rows(
397486
result_id: str,
398487
total: Optional[int],
399488
max_auto_rows: Optional[int],
489+
max_auto_bytes: Optional[int],
400490
results_api: _ResultsApi,
401491
) -> List[List[Any]]:
402492
page_size = self.poll.page_size
403493
rows: List[List[Any]] = []
494+
byte_estimate = 0
404495
offset = 0
405496
while True:
406497
page = results_api.get_result(
@@ -410,27 +501,58 @@ def _fetch_all_rows(
410501
format=ResultsFormatQuery.JSON,
411502
)
412503
batch = page.rows or []
504+
505+
# Known total but the server returned nothing more: surface the gap
506+
# rather than silently returning a partial result.
507+
if total is not None and not batch and offset < total:
508+
raise ResultIncompleteError(
509+
result_id=result_id, fetched=offset, expected=total
510+
)
511+
413512
rows.extend(batch)
414-
# Enforce the guard during pagination too, in case the total was
513+
514+
# Enforce both guards during pagination, in case the total was
415515
# unknown up front (total_row_count null, query-run lookup failed).
416516
if max_auto_rows is not None and len(rows) > max_auto_rows:
417517
raise ResultTooLargeError(
418-
result_id=result_id, total=total, limit=max_auto_rows
518+
result_id=result_id,
519+
kind="rows",
520+
observed=len(rows),
521+
limit=max_auto_rows,
419522
)
523+
if max_auto_bytes is not None:
524+
byte_estimate += _estimate_rows_bytes(batch)
525+
if byte_estimate > max_auto_bytes:
526+
raise ResultTooLargeError(
527+
result_id=result_id,
528+
kind="bytes",
529+
observed=byte_estimate,
530+
limit=max_auto_bytes,
531+
)
532+
420533
offset += len(batch)
421-
if total is not None and offset >= total:
422-
break
534+
if total is not None:
535+
# When the total is known, completion is offset >= total; a
536+
# short (non-empty) page just means keep paging. Never stop on
537+
# page size, which is what previously dropped rows.
538+
if offset >= total:
539+
break
540+
continue
541+
# Total unknown: a short/empty page is the end of the stream.
423542
if len(batch) < page_size:
424543
break
425544
return rows
426545

427546

428547
__all__ = [
548+
"DEFAULT_MAX_AUTO_BYTES",
429549
"DEFAULT_MAX_AUTO_ROWS",
430550
"OVERLOADED_ERROR_CODE",
431551
"PollPolicy",
432552
"QueryApi",
553+
"ResultError",
433554
"ResultFailedError",
555+
"ResultIncompleteError",
434556
"ResultTimeoutError",
435557
"ResultTooLargeError",
436558
"ResultUnavailableError",

0 commit comments

Comments
 (0)