Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
84ae65e
feat: scaffold StreamWS class for general-purpose WebSocket support
aliev Apr 10, 2026
f7c0512
feat: add WS URL construction with scheme replacement and query params
aliev Apr 10, 2026
94a49ae
feat: implement WS connect/disconnect with auth handshake
aliev Apr 10, 2026
0af2023
feat: add background reader task to dispatch events to listeners
aliev Apr 10, 2026
649abe5
feat: add heartbeat loop to detect stale connections
aliev Apr 10, 2026
0843004
feat: auto-reconnect with exponential backoff and jitter
aliev Apr 10, 2026
5f042c4
feat: refresh token on error code 40 during reconnect
aliev Apr 10, 2026
7e22e8d
feat: add AsyncStream.connect_ws() for convenient WS lifecycle manage…
aliev Apr 10, 2026
8741e5e
chore: add [ws] optional dependency group for websockets
aliev Apr 10, 2026
327ca7e
docs: add internal README for WebSocket client usage and testing
aliev Apr 10, 2026
731664f
chore: update uv.lock for new [ws] optional dependency group
aliev Apr 10, 2026
ca86de1
fix: wrap health check message in array to match Stream protocol
aliev Apr 10, 2026
aa0fef4
fix: reject non-connection.ok auth responses
aliev Apr 10, 2026
1e0e997
fix: don't reconnect when server closes with code 1000
aliev Apr 10, 2026
1eb6042
fix: add wsID guard to ignore stale messages from old connections
aliev Apr 10, 2026
5e53358
fix: don't overwrite user-provided static tokens on code 40
aliev Apr 10, 2026
c0a82fa
style: apply formatter to ws client and tests
aliev Apr 10, 2026
28c78af
fix: use /api/v2/connect path for WebSocket endpoint
aliev Apr 10, 2026
a85ffce
fix: support decorator syntax for on_wildcard()
aliev Apr 10, 2026
e0f64a8
fix: allow kwargs to override base_url and user_agent in connect_ws
aliev Apr 10, 2026
d6b94ec
style: apply ruff format and remove unused import
aliev Apr 10, 2026
00fe0b6
docs: update WS README with findings from integration testing
aliev Apr 10, 2026
d68b9de
fix: prevent leaked exceptions in reconnect and disconnect cleanup
aliev Apr 10, 2026
5b1acab
fix: prevent reconnect from cancelling itself via _cancel_tasks
aliev Apr 10, 2026
43d059d
fix: clean up stale tasks on connect and guard emit from listener errors
aliev Apr 10, 2026
6dd9673
fix: prevent duplicate reconnect tasks and socket leaks on auth failure
aliev Apr 10, 2026
e2fb598
refactor: reuse BASE_URL from getstream.stream instead of duplicating
aliev Apr 10, 2026
afcbe25
refactor: extract _close_websocket helper to deduplicate socket cleanup
aliev Apr 10, 2026
47acdca
refactor: use urlparse/urlunparse for WS URL construction
aliev Apr 10, 2026
698ad74
fix: prevent StreamWS leak on connect failure and prune stale connect…
aliev Apr 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions getstream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,40 @@ def moderation(self) -> AsyncModerationClient:
user_agent=self.user_agent,
)

async def connect_ws(
self,
user_id: str,
user_details: Optional[dict] = None,
**kwargs,
):
from getstream.ws import StreamWS

defaults = {
"base_url": self.base_url,
"user_agent": self.user_agent,
}
defaults.update(kwargs)
ws = StreamWS(
api_key=self.api_key,
api_secret=self.api_secret,
user_id=user_id,
user_details=user_details or {"id": user_id},
**defaults,
)
try:
await ws.connect()
except Exception:
await ws.disconnect()
raise
# Remove any previously disconnected WS instances to prevent unbounded growth
self._ws_connections[:] = [w for w in self._ws_connections if w.connected]
self._ws_connections.append(ws)
return ws

@cached_property
def _ws_connections(self) -> list:
return []

async def aclose(self):
"""Close all child clients and the main HTTPX client."""
# AsyncExitStack ensures all clients are closed even if one fails.
Expand All @@ -220,6 +254,10 @@ async def aclose(self):
stack.push_async_callback(self.chat.aclose)
if "moderation" in cached:
stack.push_async_callback(self.moderation.aclose)
if "_ws_connections" in cached:
for ws in self._ws_connections:
stack.push_async_callback(ws.disconnect)
self._ws_connections.clear()
stack.push_async_callback(super().aclose)

@cached_property
Expand Down
27 changes: 21 additions & 6 deletions getstream/utils/event_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,27 @@ def emit(self, event, *args, **kwargs):

return result

def on_wildcard(self, pattern, listener):
"""Register a wildcard event listener"""
if pattern not in self._wildcard_listeners:
self._wildcard_listeners[pattern] = []
self._wildcard_listeners[pattern].append(listener)
return self
def on_wildcard(self, pattern, listener=None):
"""Register a wildcard event listener.

Can be used directly or as a decorator:
ws.on_wildcard("*", handler)

@ws.on_wildcard("*")
def handler(event_type, event):
...
"""

def _register(fn):
if pattern not in self._wildcard_listeners:
self._wildcard_listeners[pattern] = []
self._wildcard_listeners[pattern].append(fn)
return fn

if listener is not None:
_register(listener)
return self
return _register

def remove_wildcard_listener(self, pattern, listener):
"""Remove a specific wildcard listener"""
Expand Down
228 changes: 228 additions & 0 deletions getstream/ws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# WebSocket Client (`getstream.ws`)

General-purpose WebSocket client for consuming real-time events from Stream's coordinator API. Works with chat events, video call events, custom events, etc.

## Install

```
pip install getstream[ws]
```

## Quick start

```python
import asyncio
from getstream import AsyncStream

async def main():
client = AsyncStream(api_key="your-key", api_secret="your-secret")

ws = await client.connect_ws(
user_id="alice",
user_details={"id": "alice", "name": "Alice"},
)

@ws.on("message.new")
async def on_message(event):
print(f"New message in {event['cid']}: {event['message']['text']}")

@ws.on_wildcard("call.**")
async def on_call_event(event_type, event):
print(f"{event_type}: {event}")

# keep running until interrupted
try:
await asyncio.Event().wait()
finally:
await ws.disconnect()
await client.aclose()

asyncio.run(main())
```

## Watching a video call

To receive events for a specific video call (custom events, participant joins/leaves, etc.), you must "watch" the call after connecting. The coordinator only delivers call-scoped events to connections that have subscribed.

**Important:** The watch request must use a **client-side token** (JWT with `user_id`), not the server admin token. The coordinator checks `IsClientSide()` and silently skips the subscription for server tokens.

```python
from getstream import AsyncStream
from getstream.utils import build_query_param

client = AsyncStream(api_key="...", api_secret="...")

# 1. Connect WS as the user
ws = await client.connect_ws(user_id="agent")

# 2. Watch the call with a client-side token
user_token = client.create_token("agent")
user_client = AsyncStream(
api_key=client.api_key,
api_secret=client.api_secret,
base_url=client.base_url,
)
user_client.token = user_token
user_client.headers["authorization"] = user_token
user_client.client.headers["authorization"] = user_token

await user_client.video.get(
"/api/v2/video/call/{type}/{id}",
path_params={"type": "default", "id": "my-call-id"},
query_params=build_query_param(connection_id=ws.connection_id),
)
await user_client.aclose()

# 3. Now call events arrive on the WS
@ws.on("custom")
def on_custom(event):
print(event["custom"])
```

## Standalone usage (without AsyncStream)

```python
from getstream.ws import StreamWS

ws = StreamWS(
api_key="your-key",
api_secret="your-secret",
user_id="alice",
user_details={"id": "alice", "name": "Alice"},
)
await ws.connect()
# ... register listeners, do work ...
await ws.disconnect()
```

## How it works

### Connection lifecycle

1. **URL construction** -- `base_url` scheme is swapped (`https` to `wss`, `http` to `ws`), path `/api/v2/connect` is appended, and `api_key`, `stream-auth-type=jwt`, `X-Stream-Client` are added as query params.

2. **Auth handshake** -- on WebSocket open, the client sends:
```json
{"token": "<jwt>", "user_details": {"id": "alice", ...}}
```
Server must respond with `connection.ok`. Any other response type (including `connection.error`) raises `StreamWSAuthError`.

3. **Background tasks** -- after auth, two async tasks start:
- **Reader loop**: reads messages, parses JSON, emits events by their `type` field. Uses a `ws_id` counter to ignore stale messages from old connections during reconnect.
- **Heartbeat loop**: sends `[{"type": "health.check", "client_id": "<connection_id>"}]` (array-wrapped per protocol) every 25s (configurable). If no message is received for 35s (configurable), triggers reconnect.

4. **Disconnect** -- cancels all background tasks (reader, heartbeat, reconnect), closes the socket with code 1000.

### Reconnection

Triggered automatically when:
- The server closes the connection with a non-1000 code (code 1000 = intentional close, no reconnect).
- The heartbeat detects no messages within `healthcheck_timeout`.
- A WebSocket error occurs while sending a heartbeat.

Strategy:
- **Exponential backoff with jitter**: base 0.25s, doubles per attempt, capped at 5s, plus random jitter to prevent thundering herd.
- **Max retries**: 5 by default, then gives up and sets `connected = False`.
- **Token refresh**: if the server rejects with error code 40 (`TOKEN_EXPIRED`) and the token was auto-generated (not a static user-provided token), the cached token is cleared and a fresh JWT is generated for the next attempt. Static tokens are treated as non-refreshable.
- **ws_id guard**: each connection increments an internal counter. The reader loop exits if the counter changes, preventing stale messages from old connections being emitted after reconnect.

### Event listening

`StreamWS` extends `StreamAsyncIOEventEmitter` (pyee-based). Two ways to listen:

```python
# Exact event type
@ws.on("message.new")
def handler(event):
...

# Wildcard patterns (decorator or direct call)
@ws.on_wildcard("message.*") # single level: message.new, message.updated
def handler(event_type, event):
...

@ws.on_wildcard("call.**") # multi level: call.created, call.member.added
def handler(event_type, event):
...

@ws.on_wildcard("*") # all events
def handler(event_type, event):
...
```

Both sync and async handlers are supported. `on_wildcard` works both as a decorator (`@ws.on_wildcard("*")`) and as a direct call (`ws.on_wildcard("*", handler)`).

## Configuration

All constructor params (also accepted as kwargs to `client.connect_ws()`):

| Param | Default | Description |
|-------|---------|-------------|
| `api_key` | required | Stream API key |
| `api_secret` | required | Stream API secret (used to generate JWT) |
| `user_id` | required | User ID to connect as |
| `user_details` | `{"id": user_id}` | User details sent during auth |
| `base_url` | `https://chat.stream-io-api.com` | Base API URL |
| `token` | auto-generated | Pre-generated JWT (skips auto-generation, treated as static/non-refreshable) |
| `user_agent` | `stream-python-client-{VERSION}` | Client identifier |
| `healthcheck_interval` | `25.0` | Seconds between heartbeat pings |
| `healthcheck_timeout` | `35.0` | Seconds of silence before reconnect |
| `max_retries` | `5` | Max reconnect attempts before giving up |
| `backoff_base` | `0.25` | Initial backoff delay in seconds |
| `backoff_max` | `5.0` | Maximum backoff delay in seconds |

Note: `connect_ws()` kwargs override the defaults from `AsyncStream` (e.g. you can pass a different `base_url`).

## Testing

### Unit tests (no credentials needed)

```
uv run pytest tests/ws/ --timeout=10
```

Tests use a local mock WebSocket server (`websockets.serve` on `127.0.0.1:0`) and cover:
- Instantiation and URL construction
- Auth handshake (success, failure, and unexpected response types)
- Event dispatch to typed and wildcard listeners
- Heartbeat message sending (array format)
- Auto-reconnect on server disconnect (non-1000 close codes)
- No reconnect on intentional close (code 1000)
- Token refresh on error code 40
- Static token protection (no refresh for user-provided tokens)
- ws_id stale message guard
- `AsyncStream.connect_ws()` integration and `aclose()` cleanup

### Debugging

Enable WebSocket-level debug logs:

```python
import logging
logging.getLogger("getstream.ws.client").setLevel(logging.DEBUG)
logging.getLogger("websockets.client").setLevel(logging.DEBUG)
```

## Architecture

```
getstream/ws/
__init__.py -- exports StreamWS, StreamWSAuthError
client.py -- StreamWS class (connection, auth, reader, heartbeat, reconnect)

getstream/stream.py -- AsyncStream.connect_ws() convenience method

Tests:
tests/ws/
test_ws_client.py -- unit tests for StreamWS
test_async_stream_ws.py -- integration tests for AsyncStream.connect_ws()
```

Reference implementation: `stream-video-js/packages/client/src/coordinator/connection/connection.ts`

## Known limitations

- **Async only** -- WebSocket is inherently async. Sync users can call `stream.as_async()` to get an `AsyncStream`.
- **No `products` filter yet** -- the JS SDK sends `products: ["video"]` or `["chat"]` to filter events. This can be added to the auth payload when needed.
- **Single connection per `connect_ws()` call** -- each call creates a separate WS connection. No multiplexing.
- **Call watching requires manual setup** -- receiving call-scoped events requires a separate REST call with a client-side token (see "Watching a video call" above). This could be wrapped in a helper method in the future.
3 changes: 3 additions & 0 deletions getstream/ws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .client import StreamWS, StreamWSAuthError

__all__ = ["StreamWS", "StreamWSAuthError"]
Loading
Loading