feat: add general-purpose WebSocket client for real-time event consumption#236
Draft
feat: add general-purpose WebSocket client for real-time event consumption#236
Conversation
First TDD step toward real-time event consumption. Minimal StreamWS extends StreamAsyncIOEventEmitter with api_key, api_secret, user_id and a connected property.
StreamWS now builds the WebSocket URL from the base HTTP URL, replacing the scheme (https->wss, http->ws) and appending /connect with api_key, stream-auth-type, and X-Stream-Client query parameters.
StreamWS can now open a WebSocket, send an auth payload (JWT token + user_details), and handle the connection.ok/connection.error response. Disconnect closes the socket cleanly. Token is auto-generated from api_secret when not provided.
Without a reader loop, events sent by the server after auth were silently dropped. The reader task reads WS messages, parses JSON, and emits them by type so registered listeners receive events. Also consolidates duplicate test fixtures into a single mock_server.
Without periodic health checks the client has no way to know the connection dropped silently. The heartbeat sends health.check messages at a configurable interval and tracks last-received time for future timeout detection.
Without reconnect, a dropped connection was permanent. The client now detects connection loss (via reader loop and heartbeat timeout) and retries with exponential backoff plus random jitter to avoid thundering herd. Reconnect task is held in an instance variable to prevent GC.
Previously a token-expired rejection during reconnect was treated as a fatal auth error. Now the client detects code 40, clears the cached token so a fresh JWT is generated, and retries within the same backoff loop.
…ment Users shouldn't need to wire up StreamWS manually. connect_ws() on AsyncStream creates, connects, and tracks WS instances so aclose() can disconnect them all. Also fixes leaked reconnect tasks on disconnect and a flaky token comparison in tests.
Users install WS support with `pip install getstream[ws]`. Keeps websockets out of core deps since not all users need real-time events.
Covers connection lifecycle, reconnect strategy, event listening, configuration, and how to run both unit and manual integration tests.
The JS reference sends health checks as [{type, client_id}] (array).
Our plain object format could be silently ignored by the server.
Previously any non-error response was accepted as successful auth. The server could send an unexpected message type and we'd silently treat it as connected. Now we strictly require connection.ok.
Code 1000 is an intentional close per the WS spec. Reconnecting on it wastes resources and fights the server's intent. Non-1000 codes still trigger automatic reconnect as before.
During reconnect, the old reader loop could still be processing messages from the previous socket. The ws_id counter increments on each connection and the reader exits if it detects a mismatch, preventing stale events from being emitted.
If the user explicitly passed a token, clearing it on TOKEN_EXPIRED would silently replace it with an auto-generated one. Now static tokens are treated as non-refreshable and code 40 becomes a fatal auth error, matching the JS SDK's isStatic() check.
The WS endpoint is at /api/v2/connect, not /connect. Using the wrong path caused the server to immediately close with 1000, which looked like a ConnectionClosedOK before auth could be sent.
on_wildcard("*", handler) worked but @ws.on_wildcard("*") did not,
since listener was required. Now it supports both forms, consistent
with pyee's .on() decorator pattern.
connect_ws passed base_url from self.base_url AND forwarded kwargs, causing a duplicate keyword argument error when callers tried to override base_url. Now kwargs take precedence over defaults.
Adds call-watching section (client-side token requirement), corrects the WS endpoint path to /api/v2/connect, documents array-wrapped health checks, ws_id guard, close code behavior, static token protection, and on_wildcard decorator support.
connect() now cleans up any prior tasks and socket state directly, instead of calling disconnect(). This prevents leaked heartbeat tasks from a prior normal close (code 1000) from running against a new socket, without the awkward disconnect-before-connect pattern. Also wraps self.emit() in try/except so a failing sync listener doesn't kill the reader loop and silently drop all subsequent events.
06b26d6 to
43d059d
Compare
_trigger_reconnect now sets _reconnecting=True before scheduling the task, so two failures in the same event loop tick can't both spawn reconnect tasks. The redundant guard inside _reconnect is removed. _open_connection now closes the socket on any handshake failure (not just wrong response type), preventing leaked connections from recv() or json.loads() errors.
Four places did "if websocket, try close, ignore error, set None." Consolidated into _close_websocket(code, reason) with debug logging on failure.
String replacement for scheme swapping was fragile. urlparse properly handles the URL components and _replace makes the intent clear.
…ions If ws.connect() failed (e.g. auth error), the StreamWS object was abandoned without cleanup, leaking the websocket. Now disconnect() is called before re-raising. Also prune disconnected WS instances from _ws_connections on each connect_ws() call to prevent unbounded list growth.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
stream-py can produce messages via REST but has no way to consume real-time events (chat, video, custom). The JS SDK has full WebSocket support; this brings parity to the Python SDK so server-side agents can receive events like turn detection, call state changes, and custom payloads.
Changes
getstream/ws/package withStreamWSclass:connection.okvalidation)AsyncStream.connect_ws()convenience method with kwargs override supporton_wildcard()now supports decorator syntax[ws]optional dependency group in pyproject.tomlTested end-to-end with Vision-Agents: WS receives custom events (turn detection), call lifecycle events, and health checks from Stream's coordinator.