Skip to content

feat: add general-purpose WebSocket client for real-time event consumption#236

Draft
aliev wants to merge 30 commits intomainfrom
feat/websocket-support
Draft

feat: add general-purpose WebSocket client for real-time event consumption#236
aliev wants to merge 30 commits intomainfrom
feat/websocket-support

Conversation

@aliev
Copy link
Copy Markdown
Member

@aliev aliev commented Apr 10, 2026

Why

stream-py can produce messages via REST but has no way to consume real-time events (chat, video, custom). The JS SDK has full WebSocket support; this brings parity to the Python SDK so server-side agents can receive events like turn detection, call state changes, and custom payloads.

Changes

  • New getstream/ws/ package with StreamWS class:
    • Auth handshake (JWT + user_details, strict connection.ok validation)
    • Background reader loop with event dispatch via pyee
    • Heartbeat (25s ping in array format per protocol, 35s timeout)
    • Auto-reconnect with exponential backoff + jitter
    • Token refresh on error code 40 (skipped for static tokens)
    • ws_id stale message guard
    • Close code 1000 = no reconnect
  • AsyncStream.connect_ws() convenience method with kwargs override support
  • on_wildcard() now supports decorator syntax
  • [ws] optional dependency group in pyproject.toml
  • 13 unit tests + 2 integration tests (mock WS server, no credentials needed)
  • Internal README documenting protocol, call-watching, and debugging

Tested end-to-end with Vision-Agents: WS receives custom events (turn detection), call lifecycle events, and health checks from Stream's coordinator.

aliev added 22 commits April 10, 2026 17:04
First TDD step toward real-time event consumption. Minimal StreamWS
extends StreamAsyncIOEventEmitter with api_key, api_secret, user_id
and a connected property.
StreamWS now builds the WebSocket URL from the base HTTP URL, replacing
the scheme (https->wss, http->ws) and appending /connect with api_key,
stream-auth-type, and X-Stream-Client query parameters.
StreamWS can now open a WebSocket, send an auth payload (JWT token +
user_details), and handle the connection.ok/connection.error response.
Disconnect closes the socket cleanly. Token is auto-generated from
api_secret when not provided.
Without a reader loop, events sent by the server after auth were
silently dropped. The reader task reads WS messages, parses JSON,
and emits them by type so registered listeners receive events.

Also consolidates duplicate test fixtures into a single mock_server.
Without periodic health checks the client has no way to know the
connection dropped silently. The heartbeat sends health.check messages
at a configurable interval and tracks last-received time for future
timeout detection.
Without reconnect, a dropped connection was permanent. The client now
detects connection loss (via reader loop and heartbeat timeout) and
retries with exponential backoff plus random jitter to avoid thundering
herd. Reconnect task is held in an instance variable to prevent GC.
Previously a token-expired rejection during reconnect was treated as a
fatal auth error. Now the client detects code 40, clears the cached
token so a fresh JWT is generated, and retries within the same backoff
loop.
…ment

Users shouldn't need to wire up StreamWS manually. connect_ws() on
AsyncStream creates, connects, and tracks WS instances so aclose()
can disconnect them all. Also fixes leaked reconnect tasks on
disconnect and a flaky token comparison in tests.
Users install WS support with `pip install getstream[ws]`. Keeps
websockets out of core deps since not all users need real-time events.
Covers connection lifecycle, reconnect strategy, event listening,
configuration, and how to run both unit and manual integration tests.
The JS reference sends health checks as [{type, client_id}] (array).
Our plain object format could be silently ignored by the server.
Previously any non-error response was accepted as successful auth.
The server could send an unexpected message type and we'd silently
treat it as connected. Now we strictly require connection.ok.
Code 1000 is an intentional close per the WS spec. Reconnecting on it
wastes resources and fights the server's intent. Non-1000 codes still
trigger automatic reconnect as before.
During reconnect, the old reader loop could still be processing
messages from the previous socket. The ws_id counter increments on
each connection and the reader exits if it detects a mismatch,
preventing stale events from being emitted.
If the user explicitly passed a token, clearing it on TOKEN_EXPIRED
would silently replace it with an auto-generated one. Now static
tokens are treated as non-refreshable and code 40 becomes a fatal
auth error, matching the JS SDK's isStatic() check.
The WS endpoint is at /api/v2/connect, not /connect. Using the wrong
path caused the server to immediately close with 1000, which looked
like a ConnectionClosedOK before auth could be sent.
on_wildcard("*", handler) worked but @ws.on_wildcard("*") did not,
since listener was required. Now it supports both forms, consistent
with pyee's .on() decorator pattern.
connect_ws passed base_url from self.base_url AND forwarded kwargs,
causing a duplicate keyword argument error when callers tried to
override base_url. Now kwargs take precedence over defaults.
Adds call-watching section (client-side token requirement), corrects
the WS endpoint path to /api/v2/connect, documents array-wrapped
health checks, ws_id guard, close code behavior, static token
protection, and on_wildcard decorator support.
connect() now cleans up any prior tasks and socket state directly,
instead of calling disconnect(). This prevents leaked heartbeat tasks
from a prior normal close (code 1000) from running against a new
socket, without the awkward disconnect-before-connect pattern.

Also wraps self.emit() in try/except so a failing sync listener
doesn't kill the reader loop and silently drop all subsequent events.
aliev added 5 commits April 11, 2026 00:19
_trigger_reconnect now sets _reconnecting=True before scheduling the
task, so two failures in the same event loop tick can't both spawn
reconnect tasks. The redundant guard inside _reconnect is removed.

_open_connection now closes the socket on any handshake failure (not
just wrong response type), preventing leaked connections from recv()
or json.loads() errors.
Four places did "if websocket, try close, ignore error, set None."
Consolidated into _close_websocket(code, reason) with debug logging
on failure.
String replacement for scheme swapping was fragile. urlparse properly
handles the URL components and _replace makes the intent clear.
…ions

If ws.connect() failed (e.g. auth error), the StreamWS object was
abandoned without cleanup, leaking the websocket. Now disconnect()
is called before re-raising.

Also prune disconnected WS instances from _ws_connections on each
connect_ws() call to prevent unbounded list growth.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant