Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1289ee8
chore: bootstrap CHA-2956 connection pooling branch
mogita May 26, 2026
4be6ce8
feat: add 5-knob HTTP transport config (Stream/AsyncStream constructo…
mogita May 26, 2026
6315a38
feat: wire 5 pool knobs into httpx.Client/AsyncClient (limits + timeout)
mogita May 26, 2026
2209a8c
test: verify http_client escape hatch bypasses pool knob application
mogita May 26, 2026
7a1e183
test: verify per-call timeout override reaches httpx for sync and async
mogita May 26, 2026
3efc074
feat: log effective transport config on client construction (spec §8)
mogita May 26, 2026
56518c3
test: lock sync/async pool config parity (spec §5.3)
mogita May 26, 2026
b87c9fb
docs: changelog for 3.5.0 connection pooling; bump version
mogita May 26, 2026
7fbcf68
fix: _resolve_pool_knobs preserves explicit 0/0.0 via is-None checks
mogita May 26, 2026
3c5f546
style: wrap long ternary in _resolve_pool_knobs for ruff format
mogita May 27, 2026
9ce1772
test: update video example default-timeout assertions to 30.0
mogita May 27, 2026
697b64c
docs: remove internal Notion links and spec-section refs from custome…
mogita May 27, 2026
9336c1a
docs: unwrap hard-wrapped CHA-2956 prose into natural lines
mogita May 27, 2026
ecc7d1c
docs: replace em dashes with plain ASCII in CHA-2956 content
mogita May 27, 2026
cfe2b02
docs: replace em dashes with natural punctuation
mogita May 27, 2026
689bbf3
docs: use colon separator in CHANGELOG option list
mogita May 27, 2026
0759b65
fix: forward pool config in clone_for_token and as_async
mogita May 27, 2026
a960e63
fix: propagate pool config to sub-clients
mogita May 27, 2026
15124e0
fix: resolve pool env fallbacks without requiring STREAM_API_KEY
mogita May 27, 2026
78b0191
chore: revert generated test_webhook.py
mogita May 27, 2026
792abdc
fix: emit pool config INFO log once at top level
mogita May 27, 2026
70a660d
docs: remove em dashes from generated webhook files
mogita May 27, 2026
44d1307
test: use a pytest fixture for the clone/as_async pool-config tests
mogita May 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)).
Four new kwargs on `Stream(...)` and `AsyncStream(...)`:
- `max_conns_per_host: int`: default `5`
- `idle_timeout: float` (seconds): default `55.0`
- `connect_timeout: float` (seconds): default `10.0`
- `request_timeout: float` (seconds): default `30.0` (was `6.0`; see Behavior changes)

These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing `http_client=` and `transport=` kwargs continue to act as escape hatches; when `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`.
- INFO log on client construction (logger `getstream`) lists the effective pool config and whether a user-supplied `http_client` is in use.

- Webhook handling spec helpers (CHA-2961): `UnknownEvent` dataclass for
forward-compat; `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload`
primitives; `parse_event` (returns typed event or `UnknownEvent` for
unrecognized discriminators); `verify_signature` canonical alias of
`verify_webhook_signature`; `verify_and_parse_webhook` HTTP composite
(gunzip + verify + parse); `parse_sqs` and `parse_sns` queue composites
(no signature parameter queue transports are authenticated by AWS IAM,
(no signature parameter: queue transports are authenticated by AWS IAM,
so the backend emits no HMAC for queue messages today). Transparent gzip
via magic-byte detection.
- New instance methods on `Stream` and `AsyncStream`:
`verify_signature(body, signature)` and
`verify_and_parse_webhook(body, signature)` drop the api_secret parameter
`verify_and_parse_webhook(body, signature)` that drop the api_secret parameter
in favor of the client's stored secret. Dual API: the module-level functions
in `getstream.webhook` remain available for callers who want explicit
secret control.
Expand All @@ -35,10 +45,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected; `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly.
- Default HTTP transport now caps connections per host at `5` and closes idle sockets after `55.0s`. Previous default was httpx's `100` max-connections with `5.0s` keep-alive expiry.
- No breaking changes. All existing webhook helpers (`verify_webhook_signature`,
`parse_webhook_event`, `get_event_type`, event type constants) are preserved.

[Spec](https://www.notion.so/stream-wiki/Server-Side-SDK-Webhook-Handling-Spec-34b6a5d7f9f681e78003c443f227493c)
### Notes

- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, `.post(...)`, etc., and pre-empts the client-level `request_timeout`.

## [3.0.0b1] - 2026-02-27

Expand Down
100 changes: 94 additions & 6 deletions getstream/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import mimetypes
import os
import time
Expand Down Expand Up @@ -27,6 +28,51 @@
import ijson


# ── Connection pool defaults (CHA-2956) ──────────────────────────────
# Kept in sync with getstream.stream constants; duplicated here so BaseClient/AsyncBaseClient can be instantiated standalone (e.g. by sub-clients constructed directly without going through Stream/AsyncStream).
DEFAULT_MAX_CONNS_PER_HOST = 5
DEFAULT_IDLE_TIMEOUT = 55.0
DEFAULT_CONNECT_TIMEOUT = 10.0


logger = logging.getLogger("getstream")


def _resolve_pool_knobs(obj):
"""Pull the 3 pool knobs off ``obj`` if BaseStream has set them, else fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets them on ``self`` before calling ``super().__init__()``, so a directly instantiated sub-client (or test fixture) still gets sane values.

`is None` (not truthiness) so an explicit `0` / `0.0` from the caller is preserved rather than silently swapped for a default.
"""
max_conns_per_host = getattr(obj, "max_conns_per_host", None)
idle_timeout = getattr(obj, "idle_timeout", None)
connect_timeout = getattr(obj, "connect_timeout", None)
return (
DEFAULT_MAX_CONNS_PER_HOST
if max_conns_per_host is None
else max_conns_per_host,
DEFAULT_IDLE_TIMEOUT if idle_timeout is None else idle_timeout,
DEFAULT_CONNECT_TIMEOUT if connect_timeout is None else connect_timeout,
)


def _log_pool_config(cfg, *, user_http_client: bool) -> None:
if user_http_client:
logger.info(
"getstream connection pool: user_http_client=True (5 knobs not applied)"
)
else:
logger.info(
"getstream connection pool: "
"max_conns_per_host=%s idle_timeout=%ss "
"connect_timeout=%ss request_timeout=%ss "
"user_http_client=False",
cfg.max_conns_per_host,
cfg.idle_timeout,
cfg.connect_timeout,
cfg.timeout,
)


def _read_file_bytes(file_path: str) -> bytes:
with open(file_path, "rb") as f:
return f.read()
Expand Down Expand Up @@ -151,12 +197,17 @@ def __init__(
timeout=None,
user_agent=None,
):
# The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) are set on ``self`` by BaseStream prior to this call when used via the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients constructed directly use the spec defaults via _resolve_pool_knobs.
max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self)
super().__init__(
api_key=api_key,
base_url=base_url,
token=token,
timeout=timeout,
user_agent=user_agent,
max_conns_per_host=max_conns_per_host,
idle_timeout=idle_timeout,
connect_timeout=connect_timeout,
)
http_client = getattr(self, "_http_client", None)
if http_client is not None:
Expand All @@ -173,23 +224,39 @@ def __init__(
self.client = http_client
self._owns_http_client = False
else:
limits = httpx.Limits(
max_connections=self.max_conns_per_host,
max_keepalive_connections=self.max_conns_per_host,
keepalive_expiry=self.idle_timeout,
)
timeout_obj = httpx.Timeout(
connect=self.connect_timeout,
read=self.timeout,
write=self.timeout,
pool=self.timeout,
)
transport = getattr(self, "_transport", None)
if transport is not None:
self.client = httpx.Client(
base_url=self.base_url or "",
headers={**self.headers, "Accept-Encoding": "gzip"},
params=self.params,
timeout=httpx.Timeout(self.timeout),
timeout=timeout_obj,
limits=limits,
transport=transport,
)
else:
self.client = httpx.Client(
base_url=self.base_url or "",
headers={**self.headers, "Accept-Encoding": "gzip"},
params=self.params,
timeout=httpx.Timeout(self.timeout),
timeout=timeout_obj,
limits=limits,
)
self._owns_http_client = True
# The pool-config INFO line is emitted once by BaseStream after the
# top-level client is built, not here, to avoid one line per
# sub-client construction.

def __enter__(self):
return self
Expand Down Expand Up @@ -376,7 +443,7 @@ def close(self):
Close HTTPX client.

If the client was provided externally via ``http_client``, this is a
no-op the caller that created the client is responsible for closing
no-op; the caller that created the client is responsible for closing
it.
"""
if getattr(self, "_owns_http_client", True):
Expand All @@ -392,12 +459,17 @@ def __init__(
timeout=None,
user_agent=None,
):
# The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) are set on ``self`` by BaseStream prior to this call when used via the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients constructed directly use the spec defaults via _resolve_pool_knobs.
max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self)
super().__init__(
api_key=api_key,
base_url=base_url,
token=token,
timeout=timeout,
user_agent=user_agent,
max_conns_per_host=max_conns_per_host,
idle_timeout=idle_timeout,
connect_timeout=connect_timeout,
)
http_client = getattr(self, "_http_client", None)
if http_client is not None:
Expand All @@ -414,23 +486,39 @@ def __init__(
self.client = http_client
self._owns_http_client = False
else:
limits = httpx.Limits(
max_connections=self.max_conns_per_host,
max_keepalive_connections=self.max_conns_per_host,
keepalive_expiry=self.idle_timeout,
)
timeout_obj = httpx.Timeout(
connect=self.connect_timeout,
read=self.timeout,
write=self.timeout,
pool=self.timeout,
)
transport = getattr(self, "_transport", None)
if transport is not None:
self.client = httpx.AsyncClient(
base_url=self.base_url or "",
headers={**self.headers, "Accept-Encoding": "gzip"},
params=self.params,
timeout=httpx.Timeout(self.timeout),
timeout=timeout_obj,
limits=limits,
transport=transport,
)
else:
self.client = httpx.AsyncClient(
base_url=self.base_url or "",
headers={**self.headers, "Accept-Encoding": "gzip"},
params=self.params,
timeout=httpx.Timeout(self.timeout),
timeout=timeout_obj,
limits=limits,
)
self._owns_http_client = True
# The pool-config INFO line is emitted once by BaseStream after the
# top-level client is built, not here, to avoid one line per
# sub-client construction.

async def __aenter__(self):
return self
Expand All @@ -442,7 +530,7 @@ async def aclose(self):
"""Close HTTPX async client (closes pools/keep-alives).

If the client was provided externally via ``http_client``, this is a
no-op the caller that created the client is responsible for closing
no-op; the caller that created the client is responsible for closing
it.
"""
if getattr(self, "_owns_http_client", True):
Expand Down
6 changes: 6 additions & 0 deletions getstream/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ def __init__(
anonymous=False,
timeout=None,
user_agent=None,
max_conns_per_host=None,
idle_timeout=None,
connect_timeout=None,
):
self.anonymous = anonymous
self.timeout = timeout
self.max_conns_per_host = max_conns_per_host
self.idle_timeout = idle_timeout
self.connect_timeout = connect_timeout

self.base_url = base_url
self.params = {"api_key": api_key}
Expand Down
Loading
Loading