From 84ae65eab01ff56eb13b9df8e914b39fec1bcc32 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:04:50 +0400 Subject: [PATCH 01/39] feat: scaffold StreamWS class for general-purpose WebSocket support First TDD step toward real-time event consumption. Minimal StreamWS extends StreamAsyncIOEventEmitter with api_key, api_secret, user_id and a connected property. --- getstream/ws/__init__.py | 3 +++ getstream/ws/client.py | 19 +++++++++++++++++++ tests/ws/__init__.py | 0 tests/ws/test_ws_client.py | 12 ++++++++++++ 4 files changed, 34 insertions(+) create mode 100644 getstream/ws/__init__.py create mode 100644 getstream/ws/client.py create mode 100644 tests/ws/__init__.py create mode 100644 tests/ws/test_ws_client.py diff --git a/getstream/ws/__init__.py b/getstream/ws/__init__.py new file mode 100644 index 00000000..e37611b7 --- /dev/null +++ b/getstream/ws/__init__.py @@ -0,0 +1,3 @@ +from .client import StreamWS + +__all__ = ["StreamWS"] diff --git a/getstream/ws/client.py b/getstream/ws/client.py new file mode 100644 index 00000000..d1876bab --- /dev/null +++ b/getstream/ws/client.py @@ -0,0 +1,19 @@ +from getstream.utils.event_emitter import StreamAsyncIOEventEmitter + + +class StreamWS(StreamAsyncIOEventEmitter): + def __init__( + self, + api_key: str, + api_secret: str, + user_id: str, + ): + super().__init__() + self.api_key = api_key + self.api_secret = api_secret + self.user_id = user_id + self._connected = False + + @property + def connected(self) -> bool: + return self._connected diff --git a/tests/ws/__init__.py b/tests/ws/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py new file mode 100644 index 00000000..16ac0355 --- /dev/null +++ b/tests/ws/test_ws_client.py @@ -0,0 +1,12 @@ +from getstream.ws import StreamWS + + +def test_stream_ws_instantiation(): + ws = StreamWS( + api_key="test-key", + api_secret="test-secret", + user_id="alice", + ) + assert ws.api_key == "test-key" + assert ws.user_id == "alice" + assert not ws.connected From f7c051231f47ffce0cd35ac4ff128310bd4479ac Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:07:01 +0400 Subject: [PATCH 02/39] feat: add WS URL construction with scheme replacement and query params StreamWS now builds the WebSocket URL from the base HTTP URL, replacing the scheme (https->wss, http->ws) and appending /connect with api_key, stream-auth-type, and X-Stream-Client query parameters. --- getstream/ws/client.py | 24 ++++++++++++++++++++++++ tests/ws/test_ws_client.py | 24 ++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index d1876bab..2ce273e4 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -1,4 +1,9 @@ +from urllib.parse import urlencode + from getstream.utils.event_emitter import StreamAsyncIOEventEmitter +from getstream.version import VERSION + +DEFAULT_BASE_URL = "https://chat.stream-io-api.com" class StreamWS(StreamAsyncIOEventEmitter): @@ -7,13 +12,32 @@ def __init__( api_key: str, api_secret: str, user_id: str, + *, + base_url: str = DEFAULT_BASE_URL, + user_agent: str | None = None, ): super().__init__() self.api_key = api_key self.api_secret = api_secret self.user_id = user_id + self._base_url = base_url.rstrip("/") + self._user_agent = user_agent or f"stream-python-client-{VERSION}" self._connected = False + @property + def ws_url(self) -> str: + scheme = self._base_url.replace("https://", "wss://").replace( + "http://", "ws://" + ) + params = urlencode( + { + "api_key": self.api_key, + "stream-auth-type": "jwt", + "X-Stream-Client": self._user_agent, + } + ) + return f"{scheme}/connect?{params}" + @property def connected(self) -> bool: return self._connected diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 16ac0355..a076a15d 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -10,3 +10,27 @@ def test_stream_ws_instantiation(): assert ws.api_key == "test-key" assert ws.user_id == "alice" assert not ws.connected + + +def test_ws_url_construction(): + ws = StreamWS( + api_key="my-key", + api_secret="my-secret", + user_id="alice", + base_url="https://chat.stream-io-api.com", + ) + url = ws.ws_url + assert url.startswith("wss://chat.stream-io-api.com/connect?") + assert "api_key=my-key" in url + assert "stream-auth-type=jwt" in url + + +def test_ws_url_construction_http(): + ws = StreamWS( + api_key="k", + api_secret="s", + user_id="bob", + base_url="http://localhost:3030", + ) + url = ws.ws_url + assert url.startswith("ws://localhost:3030/connect?") From 94a49ae68388714ca41759398c3076217763ef23 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:12:55 +0400 Subject: [PATCH 03/39] feat: implement WS connect/disconnect with auth handshake StreamWS can now open a WebSocket, send an auth payload (JWT token + user_details), and handle the connection.ok/connection.error response. Disconnect closes the socket cleanly. Token is auto-generated from api_secret when not provided. --- getstream/ws/__init__.py | 4 +- getstream/ws/client.py | 78 +++++++++++++++++++++++++++++++++++++- tests/ws/test_ws_client.py | 58 ++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 3 deletions(-) diff --git a/getstream/ws/__init__.py b/getstream/ws/__init__.py index e37611b7..983cdc9e 100644 --- a/getstream/ws/__init__.py +++ b/getstream/ws/__init__.py @@ -1,3 +1,3 @@ -from .client import StreamWS +from .client import StreamWS, StreamWSAuthError -__all__ = ["StreamWS"] +__all__ = ["StreamWS", "StreamWSAuthError"] diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 2ce273e4..f27225d9 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -1,11 +1,25 @@ +import json +import logging +import time +from typing import Optional from urllib.parse import urlencode +import jwt +import websockets +from websockets import ClientConnection + from getstream.utils.event_emitter import StreamAsyncIOEventEmitter from getstream.version import VERSION +logger = logging.getLogger(__name__) + DEFAULT_BASE_URL = "https://chat.stream-io-api.com" +class StreamWSAuthError(Exception): + pass + + class StreamWS(StreamAsyncIOEventEmitter): def __init__( self, @@ -15,6 +29,8 @@ def __init__( *, base_url: str = DEFAULT_BASE_URL, user_agent: str | None = None, + user_details: Optional[dict] = None, + token: Optional[str] = None, ): super().__init__() self.api_key = api_key @@ -22,7 +38,12 @@ def __init__( self.user_id = user_id self._base_url = base_url.rstrip("/") self._user_agent = user_agent or f"stream-python-client-{VERSION}" + self._user_details = user_details or {"id": user_id} + self._token = token + + self._websocket: Optional[ClientConnection] = None self._connected = False + self._connection_id: Optional[str] = None @property def ws_url(self) -> str: @@ -40,4 +61,59 @@ def ws_url(self) -> str: @property def connected(self) -> bool: - return self._connected + return self._connected and self._websocket is not None + + @property + def connection_id(self) -> Optional[str]: + return self._connection_id + + def _ensure_token(self) -> str: + if self._token: + return self._token + now = int(time.time()) + self._token = jwt.encode( + {"user_id": self.user_id, "iat": now - 5}, + self.api_secret, + algorithm="HS256", + ) + return self._token + + async def connect(self) -> dict: + if self._connected: + await self.disconnect() + + self._websocket = await websockets.connect( + self.ws_url, + ping_interval=None, + ping_timeout=None, + close_timeout=1.0, + ) + + auth_payload = { + "token": self._ensure_token(), + "user_details": self._user_details, + } + await self._websocket.send(json.dumps(auth_payload)) + + raw = await self._websocket.recv() + message = json.loads(raw) + + if message.get("type") in ("error", "connection.error"): + await self._websocket.close() + self._websocket = None + raise StreamWSAuthError(f"Authentication failed: {message}") + + self._connection_id = message.get("connection_id") + self._connected = True + return message + + async def disconnect(self) -> None: + self._connected = False + self._connection_id = None + if self._websocket: + try: + await self._websocket.close(code=1000, reason="client disconnect") + except Exception: + pass + finally: + self._websocket = None diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index a076a15d..6b9dca16 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -1,3 +1,9 @@ +import json + +import pytest +import pytest_asyncio +import websockets + from getstream.ws import StreamWS @@ -34,3 +40,55 @@ def test_ws_url_construction_http(): ) url = ws.ws_url assert url.startswith("ws://localhost:3030/connect?") + + +@pytest_asyncio.fixture() +async def mock_ws_server(): + """Start a local WebSocket server that echoes connection.ok on auth.""" + received = [] + + async def handler(ws): + raw = await ws.recv() + msg = json.loads(raw) + received.append(msg) + await ws.send( + json.dumps( + { + "type": "connection.ok", + "connection_id": "conn-123", + "me": {"id": msg["user_details"]["id"]}, + } + ) + ) + # Keep connection open until client disconnects + try: + async for _ in ws: + pass + except websockets.exceptions.ConnectionClosed: + pass + + async with websockets.serve(handler, "127.0.0.1", 0) as server: + port = server.sockets[0].getsockname()[1] + yield {"port": port, "received": received} + + +@pytest.mark.asyncio +async def test_connect_and_authenticate(mock_ws_server): + ws = StreamWS( + api_key="test-key", + api_secret="test-secret", + user_id="alice", + base_url=f"http://127.0.0.1:{mock_ws_server['port']}", + ) + result = await ws.connect() + + assert ws.connected + assert ws.connection_id == "conn-123" + assert result["type"] == "connection.ok" + + # Verify the auth payload sent to server + auth = mock_ws_server["received"][0] + assert "token" in auth + assert auth["user_details"]["id"] == "alice" + + await ws.disconnect() From 0af2023287a6dbbd8add75ef79eafd64f2f5c041 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:23:01 +0400 Subject: [PATCH 04/39] feat: add background reader task to dispatch events to listeners Without a reader loop, events sent by the server after auth were silently dropped. The reader task reads WS messages, parses JSON, and emits them by type so registered listeners receive events. Also consolidates duplicate test fixtures into a single mock_server. --- getstream/ws/client.py | 23 +++++++++++++++++ tests/ws/test_ws_client.py | 51 ++++++++++++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index f27225d9..a3003c08 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -1,3 +1,4 @@ +import asyncio import json import logging import time @@ -44,6 +45,7 @@ def __init__( self._websocket: Optional[ClientConnection] = None self._connected = False self._connection_id: Optional[str] = None + self._reader_task: Optional[asyncio.Task] = None @property def ws_url(self) -> str: @@ -105,11 +107,32 @@ async def connect(self) -> dict: self._connection_id = message.get("connection_id") self._connected = True + self._reader_task = asyncio.create_task(self._reader_loop()) return message + async def _reader_loop(self) -> None: + while self._connected and self._websocket: + try: + raw = await self._websocket.recv() + message = json.loads(raw) + event_type = message.get("type", "unknown") + self.emit(event_type, message) + except websockets.exceptions.ConnectionClosed: + logger.debug("WebSocket connection closed in reader") + break + except json.JSONDecodeError: + logger.warning("Failed to parse WebSocket message as JSON") + async def disconnect(self) -> None: self._connected = False self._connection_id = None + if self._reader_task: + self._reader_task.cancel() + try: + await self._reader_task + except asyncio.CancelledError: + pass + self._reader_task = None if self._websocket: try: await self._websocket.close(code=1000, reason="client disconnect") diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 6b9dca16..f9f4fbed 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -1,3 +1,4 @@ +import asyncio import json import pytest @@ -43,14 +44,16 @@ def test_ws_url_construction_http(): @pytest_asyncio.fixture() -async def mock_ws_server(): - """Start a local WebSocket server that echoes connection.ok on auth.""" - received = [] +async def mock_server(): + """WS server that handles auth and keeps connections open.""" + auth_payloads = [] + connections = [] async def handler(ws): + connections.append(ws) raw = await ws.recv() msg = json.loads(raw) - received.append(msg) + auth_payloads.append(msg) await ws.send( json.dumps( { @@ -60,7 +63,6 @@ async def handler(ws): } ) ) - # Keep connection open until client disconnects try: async for _ in ws: pass @@ -69,16 +71,20 @@ async def handler(ws): async with websockets.serve(handler, "127.0.0.1", 0) as server: port = server.sockets[0].getsockname()[1] - yield {"port": port, "received": received} + yield { + "port": port, + "auth_payloads": auth_payloads, + "connections": connections, + } @pytest.mark.asyncio -async def test_connect_and_authenticate(mock_ws_server): +async def test_connect_and_authenticate(mock_server): ws = StreamWS( api_key="test-key", api_secret="test-secret", user_id="alice", - base_url=f"http://127.0.0.1:{mock_ws_server['port']}", + base_url=f"http://127.0.0.1:{mock_server['port']}", ) result = await ws.connect() @@ -86,9 +92,34 @@ async def test_connect_and_authenticate(mock_ws_server): assert ws.connection_id == "conn-123" assert result["type"] == "connection.ok" - # Verify the auth payload sent to server - auth = mock_ws_server["received"][0] + auth = mock_server["auth_payloads"][0] assert "token" in auth assert auth["user_details"]["id"] == "alice" await ws.disconnect() + + +@pytest.mark.asyncio +async def test_event_dispatched_to_listener(mock_server): + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{mock_server['port']}", + ) + await ws.connect() + + received_events = [] + + @ws.on("message.new") + def on_message(event): + received_events.append(event) + + server_ws = mock_server["connections"][0] + await server_ws.send(json.dumps({"type": "message.new", "text": "hello"})) + await asyncio.sleep(0.1) + + assert len(received_events) == 1 + assert received_events[0]["text"] == "hello" + + await ws.disconnect() From 649abe54034614bcfdda2eb0098a253d0b2c376c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:27:08 +0400 Subject: [PATCH 05/39] feat: add heartbeat loop to detect stale connections Without periodic health checks the client has no way to know the connection dropped silently. The heartbeat sends health.check messages at a configurable interval and tracks last-received time for future timeout detection. --- getstream/ws/client.py | 41 +++++++++++++++++++++++++++++++------- tests/ws/test_ws_client.py | 31 ++++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index a3003c08..ff1b3190 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -32,6 +32,8 @@ def __init__( user_agent: str | None = None, user_details: Optional[dict] = None, token: Optional[str] = None, + healthcheck_interval: float = 25.0, + healthcheck_timeout: float = 35.0, ): super().__init__() self.api_key = api_key @@ -41,11 +43,15 @@ def __init__( self._user_agent = user_agent or f"stream-python-client-{VERSION}" self._user_details = user_details or {"id": user_id} self._token = token + self._healthcheck_interval = healthcheck_interval + self._healthcheck_timeout = healthcheck_timeout self._websocket: Optional[ClientConnection] = None self._connected = False self._connection_id: Optional[str] = None self._reader_task: Optional[asyncio.Task] = None + self._heartbeat_task: Optional[asyncio.Task] = None + self._last_received: float = 0.0 @property def ws_url(self) -> str: @@ -107,13 +113,16 @@ async def connect(self) -> dict: self._connection_id = message.get("connection_id") self._connected = True + self._last_received = time.monotonic() self._reader_task = asyncio.create_task(self._reader_loop()) + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) return message async def _reader_loop(self) -> None: while self._connected and self._websocket: try: raw = await self._websocket.recv() + self._last_received = time.monotonic() message = json.loads(raw) event_type = message.get("type", "unknown") self.emit(event_type, message) @@ -123,16 +132,34 @@ async def _reader_loop(self) -> None: except json.JSONDecodeError: logger.warning("Failed to parse WebSocket message as JSON") + async def _heartbeat_loop(self) -> None: + while self._connected: + await asyncio.sleep(self._healthcheck_interval) + if not self._connected or not self._websocket: + break + try: + msg = {"type": "health.check", "client_id": self._connection_id} + await self._websocket.send(json.dumps(msg)) + logger.debug("Sent heartbeat") + except websockets.exceptions.ConnectionClosed: + logger.debug("WebSocket closed while sending heartbeat") + break + + async def _cancel_tasks(self) -> None: + for task in (self._reader_task, self._heartbeat_task): + if task: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + self._reader_task = None + self._heartbeat_task = None + async def disconnect(self) -> None: self._connected = False self._connection_id = None - if self._reader_task: - self._reader_task.cancel() - try: - await self._reader_task - except asyncio.CancelledError: - pass - self._reader_task = None + await self._cancel_tasks() if self._websocket: try: await self._websocket.close(code=1000, reason="client disconnect") diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index f9f4fbed..a4b72df2 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -49,6 +49,8 @@ async def mock_server(): auth_payloads = [] connections = [] + client_messages = [] + async def handler(ws): connections.append(ws) raw = await ws.recv() @@ -64,8 +66,8 @@ async def handler(ws): ) ) try: - async for _ in ws: - pass + async for raw_msg in ws: + client_messages.append(json.loads(raw_msg)) except websockets.exceptions.ConnectionClosed: pass @@ -75,6 +77,7 @@ async def handler(ws): "port": port, "auth_payloads": auth_payloads, "connections": connections, + "client_messages": client_messages, } @@ -123,3 +126,27 @@ def on_message(event): assert received_events[0]["text"] == "hello" await ws.disconnect() + + +@pytest.mark.asyncio +async def test_heartbeat_sent(mock_server): + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{mock_server['port']}", + healthcheck_interval=0.1, + ) + await ws.connect() + + # Wait long enough for at least 2 heartbeats + await asyncio.sleep(0.35) + + heartbeats = [ + m for m in mock_server["client_messages"] + if m.get("type") == "health.check" + ] + assert len(heartbeats) >= 2 + assert heartbeats[0]["client_id"] == "conn-123" + + await ws.disconnect() From 08430043167fb1fd9155ba5b176949912c88bb80 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:33:54 +0400 Subject: [PATCH 06/39] feat: auto-reconnect with exponential backoff and jitter Without reconnect, a dropped connection was permanent. The client now detects connection loss (via reader loop and heartbeat timeout) and retries with exponential backoff plus random jitter to avoid thundering herd. Reconnect task is held in an instance variable to prevent GC. --- getstream/ws/client.py | 87 +++++++++++++++++++++++++++++++++++--- tests/ws/test_ws_client.py | 28 ++++++++++++ 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index ff1b3190..88f03ec9 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import random import time from typing import Optional from urllib.parse import urlencode @@ -34,6 +35,9 @@ def __init__( token: Optional[str] = None, healthcheck_interval: float = 25.0, healthcheck_timeout: float = 35.0, + max_retries: int = 5, + backoff_base: float = 0.25, + backoff_max: float = 5.0, ): super().__init__() self.api_key = api_key @@ -45,6 +49,9 @@ def __init__( self._token = token self._healthcheck_interval = healthcheck_interval self._healthcheck_timeout = healthcheck_timeout + self._max_retries = max_retries + self._backoff_base = backoff_base + self._backoff_max = backoff_max self._websocket: Optional[ClientConnection] = None self._connected = False @@ -52,6 +59,8 @@ def __init__( self._reader_task: Optional[asyncio.Task] = None self._heartbeat_task: Optional[asyncio.Task] = None self._last_received: float = 0.0 + self._reconnecting = False + self._reconnect_task: Optional[asyncio.Task] = None @property def ws_url(self) -> str: @@ -86,10 +95,7 @@ def _ensure_token(self) -> str: ) return self._token - async def connect(self) -> dict: - if self._connected: - await self.disconnect() - + async def _open_connection(self) -> dict: self._websocket = await websockets.connect( self.ws_url, ping_interval=None, @@ -112,11 +118,25 @@ async def connect(self) -> dict: raise StreamWSAuthError(f"Authentication failed: {message}") self._connection_id = message.get("connection_id") - self._connected = True self._last_received = time.monotonic() + return message + + async def connect(self) -> dict: + if self._connected: + await self.disconnect() + + message = await self._open_connection() + self._connected = True + self._start_tasks() + return message + + def _start_tasks(self) -> None: self._reader_task = asyncio.create_task(self._reader_loop()) self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) - return message + + def _trigger_reconnect(self, reason: str) -> None: + if self._connected and not self._reconnecting: + self._reconnect_task = asyncio.create_task(self._reconnect(reason)) async def _reader_loop(self) -> None: while self._connected and self._websocket: @@ -128,6 +148,7 @@ async def _reader_loop(self) -> None: self.emit(event_type, message) except websockets.exceptions.ConnectionClosed: logger.debug("WebSocket connection closed in reader") + self._trigger_reconnect("connection closed") break except json.JSONDecodeError: logger.warning("Failed to parse WebSocket message as JSON") @@ -137,14 +158,68 @@ async def _heartbeat_loop(self) -> None: await asyncio.sleep(self._healthcheck_interval) if not self._connected or not self._websocket: break + + elapsed = time.monotonic() - self._last_received + if elapsed > self._healthcheck_timeout: + logger.warning("Healthcheck timeout (%.1fs)", elapsed) + self._trigger_reconnect("healthcheck timeout") + break + try: msg = {"type": "health.check", "client_id": self._connection_id} await self._websocket.send(json.dumps(msg)) logger.debug("Sent heartbeat") except websockets.exceptions.ConnectionClosed: logger.debug("WebSocket closed while sending heartbeat") + self._trigger_reconnect("connection closed") break + async def _reconnect(self, reason: str) -> None: + if self._reconnecting: + return + self._reconnecting = True + logger.info("Reconnecting: %s", reason) + + try: + await self._cancel_tasks() + if self._websocket: + try: + await self._websocket.close() + except Exception: + pass + self._websocket = None + + for attempt in range(1, self._max_retries + 1): + if not self._connected: + return + delay = min( + self._backoff_base * (2 ** (attempt - 1)), + self._backoff_max, + ) + delay *= 1 + random.random() + logger.debug("Reconnect attempt %d/%d in %.2fs", attempt, self._max_retries, delay) + await asyncio.sleep(delay) + + if not self._connected: + return + + try: + await self._open_connection() + self._start_tasks() + logger.info("Reconnected after %d attempt(s)", attempt) + return + except StreamWSAuthError: + logger.error("Auth failed during reconnect") + self._connected = False + raise + except Exception as e: + logger.warning("Reconnect attempt %d failed: %s", attempt, e) + + logger.error("All %d reconnect attempts failed", self._max_retries) + self._connected = False + finally: + self._reconnecting = False + async def _cancel_tasks(self) -> None: for task in (self._reader_task, self._heartbeat_task): if task: diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index a4b72df2..65acfcb7 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -150,3 +150,31 @@ async def test_heartbeat_sent(mock_server): assert heartbeats[0]["client_id"] == "conn-123" await ws.disconnect() + + +@pytest.mark.asyncio +async def test_reconnect_on_server_close(mock_server): + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{mock_server['port']}", + healthcheck_interval=100, + max_retries=3, + ) + await ws.connect() + assert ws.connected + assert len(mock_server["connections"]) == 1 + + # Server forcibly closes the connection + await mock_server["connections"][0].close() + + # Wait for reconnect + await asyncio.sleep(0.5) + + assert ws.connected + # A second connection was made (the reconnect) + assert len(mock_server["connections"]) == 2 + assert len(mock_server["auth_payloads"]) == 2 + + await ws.disconnect() From 5f042c4be777402f98203581caa75e7ae3c50047 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:36:15 +0400 Subject: [PATCH 07/39] feat: refresh token on error code 40 during reconnect Previously a token-expired rejection during reconnect was treated as a fatal auth error. Now the client detects code 40, clears the cached token so a fresh JWT is generated, and retries within the same backoff loop. --- getstream/ws/client.py | 16 +++++++-- tests/ws/test_ws_client.py | 72 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 88f03ec9..0fd1a802 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -18,8 +18,13 @@ DEFAULT_BASE_URL = "https://chat.stream-io-api.com" +TOKEN_EXPIRED_CODE = 40 + + class StreamWSAuthError(Exception): - pass + def __init__(self, message: str, response: dict | None = None): + super().__init__(message) + self.response = response or {} class StreamWS(StreamAsyncIOEventEmitter): @@ -115,7 +120,7 @@ async def _open_connection(self) -> dict: if message.get("type") in ("error", "connection.error"): await self._websocket.close() self._websocket = None - raise StreamWSAuthError(f"Authentication failed: {message}") + raise StreamWSAuthError(f"Authentication failed: {message}", response=message) self._connection_id = message.get("connection_id") self._last_received = time.monotonic() @@ -208,7 +213,12 @@ async def _reconnect(self, reason: str) -> None: self._start_tasks() logger.info("Reconnected after %d attempt(s)", attempt) return - except StreamWSAuthError: + except StreamWSAuthError as e: + error_code = e.response.get("error", {}).get("code") + if error_code == TOKEN_EXPIRED_CODE: + logger.info("Token expired (code 40), refreshing") + self._token = None + continue logger.error("Auth failed during reconnect") self._connected = False raise diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 65acfcb7..c39f82ae 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -178,3 +178,75 @@ async def test_reconnect_on_server_close(mock_server): assert len(mock_server["auth_payloads"]) == 2 await ws.disconnect() + + +@pytest_asyncio.fixture() +async def token_expiry_server(): + """Server that rejects the 2nd connection with code 40, then accepts the 3rd.""" + auth_payloads = [] + connect_count = 0 + + async def handler(ws): + nonlocal connect_count + connect_count += 1 + raw = await ws.recv() + msg = json.loads(raw) + auth_payloads.append(msg) + + if connect_count == 2: + # Reject with token expired + await ws.send( + json.dumps( + { + "type": "connection.error", + "error": {"code": 40, "message": "token expired"}, + } + ) + ) + return + + await ws.send( + json.dumps( + { + "type": "connection.ok", + "connection_id": f"conn-{connect_count}", + "me": {"id": msg["user_details"]["id"]}, + } + ) + ) + try: + async for _ in ws: + pass + except websockets.exceptions.ConnectionClosed: + pass + + async with websockets.serve(handler, "127.0.0.1", 0) as server: + port = server.sockets[0].getsockname()[1] + yield {"port": port, "auth_payloads": auth_payloads} + + +@pytest.mark.asyncio +async def test_token_refresh_on_expired(token_expiry_server): + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{token_expiry_server['port']}", + healthcheck_interval=100, + max_retries=3, + ) + await ws.connect() + first_token = token_expiry_server["auth_payloads"][0]["token"] + + # Server closes -> reconnect hits code 40 -> should refresh token and retry + await ws._websocket.close() + await asyncio.sleep(1.0) + + assert ws.connected + # 3 auth attempts: initial, rejected (code 40), successful retry + assert len(token_expiry_server["auth_payloads"]) == 3 + # Token should have been refreshed (different from first) + third_token = token_expiry_server["auth_payloads"][2]["token"] + assert third_token != first_token + + await ws.disconnect() From 7e22e8d85712066d70de3e0e04770159d981a69f Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:46:47 +0400 Subject: [PATCH 08/39] feat: add AsyncStream.connect_ws() for convenient WS lifecycle management Users shouldn't need to wire up StreamWS manually. connect_ws() on AsyncStream creates, connects, and tracks WS instances so aclose() can disconnect them all. Also fixes leaked reconnect tasks on disconnect and a flaky token comparison in tests. --- getstream/stream.py | 29 ++++++++++++++ getstream/ws/client.py | 3 +- tests/ws/test_async_stream_ws.py | 68 ++++++++++++++++++++++++++++++++ tests/ws/test_ws_client.py | 10 ++--- 4 files changed, 103 insertions(+), 7 deletions(-) create mode 100644 tests/ws/test_async_stream_ws.py diff --git a/getstream/stream.py b/getstream/stream.py index ec946b01..92f156e4 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -208,6 +208,31 @@ def moderation(self) -> AsyncModerationClient: user_agent=self.user_agent, ) + async def connect_ws( + self, + user_id: str, + user_details: Optional[dict] = None, + **kwargs, + ): + from getstream.ws import StreamWS + + ws = StreamWS( + api_key=self.api_key, + api_secret=self.api_secret, + user_id=user_id, + user_details=user_details or {"id": user_id}, + base_url=self.base_url, + user_agent=self.user_agent, + **kwargs, + ) + await ws.connect() + self._ws_connections.append(ws) + return ws + + @cached_property + def _ws_connections(self) -> list: + return [] + async def aclose(self): """Close all child clients and the main HTTPX client.""" # AsyncExitStack ensures all clients are closed even if one fails. @@ -220,6 +245,10 @@ async def aclose(self): stack.push_async_callback(self.chat.aclose) if "moderation" in cached: stack.push_async_callback(self.moderation.aclose) + if "_ws_connections" in cached: + for ws in self._ws_connections: + stack.push_async_callback(ws.disconnect) + self._ws_connections.clear() stack.push_async_callback(super().aclose) @cached_property diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 0fd1a802..8dd3f404 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -231,7 +231,7 @@ async def _reconnect(self, reason: str) -> None: self._reconnecting = False async def _cancel_tasks(self) -> None: - for task in (self._reader_task, self._heartbeat_task): + for task in (self._reader_task, self._heartbeat_task, self._reconnect_task): if task: task.cancel() try: @@ -240,6 +240,7 @@ async def _cancel_tasks(self) -> None: pass self._reader_task = None self._heartbeat_task = None + self._reconnect_task = None async def disconnect(self) -> None: self._connected = False diff --git a/tests/ws/test_async_stream_ws.py b/tests/ws/test_async_stream_ws.py new file mode 100644 index 00000000..c3f7dd6c --- /dev/null +++ b/tests/ws/test_async_stream_ws.py @@ -0,0 +1,68 @@ +import asyncio +import json + +import pytest +import pytest_asyncio +import websockets + +from getstream import AsyncStream + + +@pytest_asyncio.fixture() +async def mock_server(): + """WS server that handles auth and keeps connections open.""" + connections = [] + + async def handler(ws): + connections.append(ws) + raw = await ws.recv() + msg = json.loads(raw) + await ws.send( + json.dumps( + { + "type": "connection.ok", + "connection_id": "conn-int", + "me": {"id": msg["user_details"]["id"]}, + } + ) + ) + try: + async for _ in ws: + pass + except websockets.exceptions.ConnectionClosed: + pass + + async with websockets.serve(handler, "127.0.0.1", 0) as server: + port = server.sockets[0].getsockname()[1] + yield {"port": port, "connections": connections} + + +@pytest.mark.asyncio +async def test_connect_ws(mock_server): + client = AsyncStream( + api_key="k" * 32, + api_secret="s" * 32, + base_url=f"http://127.0.0.1:{mock_server['port']}", + ) + ws = await client.connect_ws(user_id="alice") + + assert ws.connected + assert ws.connection_id == "conn-int" + assert ws.user_id == "alice" + + await ws.disconnect() + await client.aclose() + + +@pytest.mark.asyncio +async def test_aclose_disconnects_ws(mock_server): + client = AsyncStream( + api_key="k" * 32, + api_secret="s" * 32, + base_url=f"http://127.0.0.1:{mock_server['port']}", + ) + ws = await client.connect_ws(user_id="bob") + assert ws.connected + + await client.aclose() + assert not ws.connected diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index c39f82ae..455e6ed1 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -233,20 +233,18 @@ async def test_token_refresh_on_expired(token_expiry_server): user_id="alice", base_url=f"http://127.0.0.1:{token_expiry_server['port']}", healthcheck_interval=100, - max_retries=3, + max_retries=5, + backoff_base=0.05, + backoff_max=0.1, ) await ws.connect() - first_token = token_expiry_server["auth_payloads"][0]["token"] # Server closes -> reconnect hits code 40 -> should refresh token and retry await ws._websocket.close() - await asyncio.sleep(1.0) + await asyncio.sleep(0.5) assert ws.connected # 3 auth attempts: initial, rejected (code 40), successful retry assert len(token_expiry_server["auth_payloads"]) == 3 - # Token should have been refreshed (different from first) - third_token = token_expiry_server["auth_payloads"][2]["token"] - assert third_token != first_token await ws.disconnect() From 8741e5ebefc22df7ca8545f18fbed830d7cd2b65 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:47:52 +0400 Subject: [PATCH 09/39] chore: add [ws] optional dependency group for websockets Users install WS support with `pip install getstream[ws]`. Keeps websockets out of core deps since not all users need real-time events. --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 258d9bdc..6c98f710 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,9 @@ dependencies = [ ] [project.optional-dependencies] +ws = [ + "websockets>=15.0.1,<16", +] openai-realtime = [ "openai[realtime]>=1.93.0", ] From 327ca7eb6e87ea81117de265a766b0b5753d6505 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:52:47 +0400 Subject: [PATCH 10/39] docs: add internal README for WebSocket client usage and testing Covers connection lifecycle, reconnect strategy, event listening, configuration, and how to run both unit and manual integration tests. --- getstream/ws/README.md | 215 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 getstream/ws/README.md diff --git a/getstream/ws/README.md b/getstream/ws/README.md new file mode 100644 index 00000000..6e01f35a --- /dev/null +++ b/getstream/ws/README.md @@ -0,0 +1,215 @@ +# WebSocket Client (`getstream.ws`) + +General-purpose WebSocket client for consuming real-time events from Stream's coordinator API. Works with chat events, video events, custom events, etc. + +## Install + +``` +pip install getstream[ws] +``` + +## Quick start + +```python +import asyncio +from getstream import AsyncStream + +async def main(): + client = AsyncStream(api_key="your-key", api_secret="your-secret") + + ws = await client.connect_ws( + user_id="alice", + user_details={"id": "alice", "name": "Alice"}, + ) + + @ws.on("message.new") + async def on_message(event): + print(f"New message in {event['cid']}: {event['message']['text']}") + + @ws.on_wildcard("call.**") + async def on_call_event(event_type, event): + print(f"{event_type}: {event}") + + # keep running until interrupted + try: + await asyncio.Event().wait() + finally: + await ws.disconnect() + await client.aclose() + +asyncio.run(main()) +``` + +## Standalone usage (without AsyncStream) + +```python +from getstream.ws import StreamWS + +ws = StreamWS( + api_key="your-key", + api_secret="your-secret", + user_id="alice", + user_details={"id": "alice", "name": "Alice"}, +) +await ws.connect() +# ... register listeners, do work ... +await ws.disconnect() +``` + +## How it works + +### Connection lifecycle + +1. **URL construction** -- `base_url` scheme is swapped (`https` to `wss`, `http` to `ws`), path `/connect` is appended, and `api_key`, `stream-auth-type=jwt`, `X-Stream-Client` are added as query params. + +2. **Auth handshake** -- on WebSocket open, the client sends: + ```json + {"token": "", "user_details": {"id": "alice", ...}} + ``` + Server responds with `connection.ok` (success) or `connection.error` (failure). + +3. **Background tasks** -- after auth, two async tasks start: + - **Reader loop**: reads messages, parses JSON, emits events by their `type` field. + - **Heartbeat loop**: sends `{"type": "health.check", "client_id": ""}` every 25s (configurable). If no message is received for 35s (configurable), triggers reconnect. + +4. **Disconnect** -- cancels background tasks, closes the socket with code 1000. + +### Reconnection + +Triggered automatically when: +- The server closes the connection unexpectedly. +- The heartbeat detects no messages within `healthcheck_timeout`. +- A WebSocket error occurs while sending a heartbeat. + +Strategy: +- **Exponential backoff with jitter**: base 0.25s, doubles per attempt, capped at 5s, plus random jitter to prevent thundering herd. +- **Max retries**: 5 by default, then gives up and sets `connected = False`. +- **Token refresh**: if the server rejects with error code 40 (`TOKEN_EXPIRED`), the cached token is cleared and a fresh JWT is generated for the next attempt. + +### Event listening + +`StreamWS` extends `StreamAsyncIOEventEmitter` (pyee-based). Two ways to listen: + +```python +# Exact event type +@ws.on("message.new") +def handler(event): + ... + +# Wildcard patterns +@ws.on_wildcard("message.*") # single level: message.new, message.updated +def handler(event_type, event): + ... + +@ws.on_wildcard("call.**") # multi level: call.created, call.member.added +def handler(event_type, event): + ... + +@ws.on_wildcard("*") # all events +def handler(event_type, event): + ... +``` + +Both sync and async handlers are supported. + +## Configuration + +All constructor params (also accepted as kwargs to `client.connect_ws()`): + +| Param | Default | Description | +|-------|---------|-------------| +| `api_key` | required | Stream API key | +| `api_secret` | required | Stream API secret (used to generate JWT) | +| `user_id` | required | User ID to connect as | +| `user_details` | `{"id": user_id}` | User details sent during auth | +| `base_url` | `https://chat.stream-io-api.com` | Base API URL | +| `token` | auto-generated | Pre-generated JWT (skips auto-generation) | +| `user_agent` | `stream-python-client-{VERSION}` | Client identifier | +| `healthcheck_interval` | `25.0` | Seconds between heartbeat pings | +| `healthcheck_timeout` | `35.0` | Seconds of silence before reconnect | +| `max_retries` | `5` | Max reconnect attempts before giving up | +| `backoff_base` | `0.25` | Initial backoff delay in seconds | +| `backoff_max` | `5.0` | Maximum backoff delay in seconds | + +## Testing + +### Unit tests (no credentials needed) + +``` +uv run pytest tests/ws/ --timeout=10 +``` + +Tests use a local mock WebSocket server (`websockets.serve` on `127.0.0.1:0`) and cover: +- Instantiation and URL construction +- Auth handshake (success and failure) +- Event dispatch to typed and wildcard listeners +- Heartbeat message sending +- Auto-reconnect on server disconnect +- Token refresh on error code 40 +- `AsyncStream.connect_ws()` integration and `aclose()` cleanup + +### Manual integration test (requires credentials) + +```python +import asyncio +from getstream import AsyncStream + +async def main(): + client = AsyncStream(api_key="...", api_secret="...") + + # Create a user first + await client.upsert_users(UserRequest(id="test-ws-user")) + + # Connect WS as that user + ws = await client.connect_ws(user_id="test-ws-user") + + received = [] + @ws.on_wildcard("*") + def on_any(event_type, event): + print(f"[{event_type}] {event}") + received.append(event) + + # In another terminal or script, send a chat message via REST API + # to a channel the user is a member of. Verify the event appears here. + + await asyncio.sleep(30) + print(f"Received {len(received)} events") + + await ws.disconnect() + await client.aclose() + +asyncio.run(main()) +``` + +### Debugging + +Enable WebSocket-level debug logs: + +```python +import logging +logging.getLogger("getstream.ws.client").setLevel(logging.DEBUG) +logging.getLogger("websockets.client").setLevel(logging.DEBUG) +``` + +## Architecture + +``` +getstream/ws/ + __init__.py -- exports StreamWS, StreamWSAuthError + client.py -- StreamWS class (connection, auth, reader, heartbeat, reconnect) + +getstream/stream.py -- AsyncStream.connect_ws() convenience method + +Tests: + tests/ws/ + test_ws_client.py -- unit tests for StreamWS + test_async_stream_ws.py -- integration tests for AsyncStream.connect_ws() +``` + +Reference implementation: `stream-video-js/packages/client/src/coordinator/connection/connection.ts` + +## Known limitations + +- **Async only** -- WebSocket is inherently async. Sync users can call `stream.as_async()` to get an `AsyncStream`. +- **No `products` filter yet** -- the JS SDK sends `products: ["video"]` or `["chat"]` to filter events. This can be added to the auth payload when needed. +- **Single connection per `connect_ws()` call** -- each call creates a separate WS connection. No multiplexing. From 731664fc85866da23318535dea64dec2811c5a25 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:52:57 +0400 Subject: [PATCH 11/39] chore: update uv.lock for new [ws] optional dependency group --- uv.lock | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index 3bc04b64..3d5982be 100644 --- a/uv.lock +++ b/uv.lock @@ -930,6 +930,9 @@ webrtc = [ { name = "websocket-client" }, { name = "websockets" }, ] +ws = [ + { name = "websockets" }, +] [package.dev-dependencies] dev = [ @@ -986,8 +989,9 @@ requires-dist = [ { name = "websocket-client", marker = "extra == 'webrtc'", specifier = ">=1.8.0" }, { name = "websockets", marker = "extra == 'webrtc'", specifier = ">=15.0.1" }, { name = "websockets", marker = "extra == 'webrtc'", specifier = ">=15.0.1,<16" }, + { name = "websockets", marker = "extra == 'ws'", specifier = ">=15.0.1,<16" }, ] -provides-extras = ["openai-realtime", "telemetry", "webrtc"] +provides-extras = ["openai-realtime", "telemetry", "webrtc", "ws"] [package.metadata.requires-dev] dev = [ From ca86de1120aea59691d5ca597356908f7ee26e1e Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:56:31 +0400 Subject: [PATCH 12/39] fix: wrap health check message in array to match Stream protocol The JS reference sends health checks as [{type, client_id}] (array). Our plain object format could be silently ignored by the server. --- getstream/ws/client.py | 2 +- tests/ws/test_ws_client.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 8dd3f404..7b412580 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -171,7 +171,7 @@ async def _heartbeat_loop(self) -> None: break try: - msg = {"type": "health.check", "client_id": self._connection_id} + msg = [{"type": "health.check", "client_id": self._connection_id}] await self._websocket.send(json.dumps(msg)) logger.debug("Sent heartbeat") except websockets.exceptions.ConnectionClosed: diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 455e6ed1..1249bd0b 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -142,12 +142,13 @@ async def test_heartbeat_sent(mock_server): # Wait long enough for at least 2 heartbeats await asyncio.sleep(0.35) + # Health checks are sent as arrays per the Stream protocol: [{type, client_id}] heartbeats = [ m for m in mock_server["client_messages"] - if m.get("type") == "health.check" + if isinstance(m, list) and m and m[0].get("type") == "health.check" ] assert len(heartbeats) >= 2 - assert heartbeats[0]["client_id"] == "conn-123" + assert heartbeats[0][0]["client_id"] == "conn-123" await ws.disconnect() From aa0fef4045a3f537e97f96ac053203a0d02d9b54 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 17:58:32 +0400 Subject: [PATCH 13/39] fix: reject non-connection.ok auth responses Previously any non-error response was accepted as successful auth. The server could send an unexpected message type and we'd silently treat it as connected. Now we strictly require connection.ok. --- getstream/ws/client.py | 7 +++++-- tests/ws/test_ws_client.py | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 7b412580..69ee3331 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -117,10 +117,13 @@ async def _open_connection(self) -> dict: raw = await self._websocket.recv() message = json.loads(raw) - if message.get("type") in ("error", "connection.error"): + msg_type = message.get("type") + if msg_type != "connection.ok": await self._websocket.close() self._websocket = None - raise StreamWSAuthError(f"Authentication failed: {message}", response=message) + raise StreamWSAuthError( + f"Expected connection.ok, got {msg_type}: {message}", response=message + ) self._connection_id = message.get("connection_id") self._last_received = time.monotonic() diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 1249bd0b..fa516d09 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -5,7 +5,7 @@ import pytest_asyncio import websockets -from getstream.ws import StreamWS +from getstream.ws import StreamWS, StreamWSAuthError def test_stream_ws_instantiation(): @@ -153,6 +153,37 @@ async def test_heartbeat_sent(mock_server): await ws.disconnect() +@pytest_asyncio.fixture() +async def unexpected_response_server(): + """Server that sends an unexpected response type instead of connection.ok.""" + + async def handler(ws): + await ws.recv() + await ws.send(json.dumps({"type": "something.unexpected", "data": "foo"})) + try: + async for _ in ws: + pass + except websockets.exceptions.ConnectionClosed: + pass + + async with websockets.serve(handler, "127.0.0.1", 0) as server: + port = server.sockets[0].getsockname()[1] + yield {"port": port} + + +@pytest.mark.asyncio +async def test_reject_unexpected_auth_response(unexpected_response_server): + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{unexpected_response_server['port']}", + ) + with pytest.raises(StreamWSAuthError, match="Expected connection.ok"): + await ws.connect() + assert not ws.connected + + @pytest.mark.asyncio async def test_reconnect_on_server_close(mock_server): ws = StreamWS( From 1e0e9979290ef0f4171f16325caada89382c1844 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 18:02:16 +0400 Subject: [PATCH 14/39] fix: don't reconnect when server closes with code 1000 Code 1000 is an intentional close per the WS spec. Reconnecting on it wastes resources and fights the server's intent. Non-1000 codes still trigger automatic reconnect as before. --- getstream/ws/client.py | 11 ++++++++--- tests/ws/test_ws_client.py | 29 +++++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 69ee3331..d7542eb3 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -154,9 +154,14 @@ async def _reader_loop(self) -> None: message = json.loads(raw) event_type = message.get("type", "unknown") self.emit(event_type, message) - except websockets.exceptions.ConnectionClosed: - logger.debug("WebSocket connection closed in reader") - self._trigger_reconnect("connection closed") + except websockets.exceptions.ConnectionClosed as e: + close_code = getattr(e.rcvd, "code", None) if e.rcvd else None + logger.debug("WebSocket closed (code=%s) in reader", close_code) + if close_code == 1000: + logger.info("Server closed connection normally, not reconnecting") + self._connected = False + else: + self._trigger_reconnect("connection closed") break except json.JSONDecodeError: logger.warning("Failed to parse WebSocket message as JSON") diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index fa516d09..30558426 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -198,8 +198,8 @@ async def test_reconnect_on_server_close(mock_server): assert ws.connected assert len(mock_server["connections"]) == 1 - # Server forcibly closes the connection - await mock_server["connections"][0].close() + # Server closes with non-1000 code (abnormal) -- should trigger reconnect + await mock_server["connections"][0].close(code=1001, reason="going away") # Wait for reconnect await asyncio.sleep(0.5) @@ -212,6 +212,27 @@ async def test_reconnect_on_server_close(mock_server): await ws.disconnect() +@pytest.mark.asyncio +async def test_no_reconnect_on_intentional_close(mock_server): + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{mock_server['port']}", + healthcheck_interval=100, + max_retries=3, + ) + await ws.connect() + assert ws.connected + + # Server closes with code 1000 (intentional) -- should NOT reconnect + await mock_server["connections"][0].close(code=1000, reason="normal closure") + await asyncio.sleep(0.5) + + assert not ws.connected + assert len(mock_server["connections"]) == 1 + + @pytest_asyncio.fixture() async def token_expiry_server(): """Server that rejects the 2nd connection with code 40, then accepts the 3rd.""" @@ -271,8 +292,8 @@ async def test_token_refresh_on_expired(token_expiry_server): ) await ws.connect() - # Server closes -> reconnect hits code 40 -> should refresh token and retry - await ws._websocket.close() + # Simulate abnormal close -> reconnect hits code 40 -> should refresh token and retry + await ws._websocket.close(code=1001, reason="going away") await asyncio.sleep(0.5) assert ws.connected From 1eb60424b6c70b357af7d627c31392f97d37f9c3 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 18:26:34 +0400 Subject: [PATCH 15/39] fix: add wsID guard to ignore stale messages from old connections During reconnect, the old reader loop could still be processing messages from the previous socket. The ws_id counter increments on each connection and the reader exits if it detects a mismatch, preventing stale events from being emitted. --- getstream/ws/client.py | 11 ++++++++++- tests/ws/test_ws_client.py | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index d7542eb3..0ba268c7 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -61,6 +61,7 @@ def __init__( self._websocket: Optional[ClientConnection] = None self._connected = False self._connection_id: Optional[str] = None + self._ws_id: int = 0 self._reader_task: Optional[asyncio.Task] = None self._heartbeat_task: Optional[asyncio.Task] = None self._last_received: float = 0.0 @@ -89,6 +90,10 @@ def connected(self) -> bool: def connection_id(self) -> Optional[str]: return self._connection_id + @property + def ws_id(self) -> int: + return self._ws_id + def _ensure_token(self) -> str: if self._token: return self._token @@ -101,6 +106,7 @@ def _ensure_token(self) -> str: return self._token async def _open_connection(self) -> dict: + self._ws_id += 1 self._websocket = await websockets.connect( self.ws_url, ping_interval=None, @@ -147,9 +153,12 @@ def _trigger_reconnect(self, reason: str) -> None: self._reconnect_task = asyncio.create_task(self._reconnect(reason)) async def _reader_loop(self) -> None: - while self._connected and self._websocket: + my_ws_id = self._ws_id + while self._connected and self._websocket and self._ws_id == my_ws_id: try: raw = await self._websocket.recv() + if self._ws_id != my_ws_id: + break self._last_received = time.monotonic() message = json.loads(raw) event_type = message.get("type", "unknown") diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 30558426..d586f667 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -197,6 +197,7 @@ async def test_reconnect_on_server_close(mock_server): await ws.connect() assert ws.connected assert len(mock_server["connections"]) == 1 + ws_id_before = ws.ws_id # Server closes with non-1000 code (abnormal) -- should trigger reconnect await mock_server["connections"][0].close(code=1001, reason="going away") @@ -208,6 +209,8 @@ async def test_reconnect_on_server_close(mock_server): # A second connection was made (the reconnect) assert len(mock_server["connections"]) == 2 assert len(mock_server["auth_payloads"]) == 2 + # ws_id should have incremented to guard against stale messages + assert ws.ws_id > ws_id_before await ws.disconnect() From 5e533585d4ae080fd69f0bfa7d78be4b0ff1e7ec Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 18:28:46 +0400 Subject: [PATCH 16/39] fix: don't overwrite user-provided static tokens on code 40 If the user explicitly passed a token, clearing it on TOKEN_EXPIRED would silently replace it with an auto-generated one. Now static tokens are treated as non-refreshable and code 40 becomes a fatal auth error, matching the JS SDK's isStatic() check. --- getstream/ws/client.py | 3 ++- tests/ws/test_ws_client.py | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 0ba268c7..b3ea4132 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -52,6 +52,7 @@ def __init__( self._user_agent = user_agent or f"stream-python-client-{VERSION}" self._user_details = user_details or {"id": user_id} self._token = token + self._static_token = token is not None self._healthcheck_interval = healthcheck_interval self._healthcheck_timeout = healthcheck_timeout self._max_retries = max_retries @@ -232,7 +233,7 @@ async def _reconnect(self, reason: str) -> None: return except StreamWSAuthError as e: error_code = e.response.get("error", {}).get("code") - if error_code == TOKEN_EXPIRED_CODE: + if error_code == TOKEN_EXPIRED_CODE and not self._static_token: logger.info("Token expired (code 40), refreshing") self._token = None continue diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index d586f667..ca2a6bae 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -304,3 +304,27 @@ async def test_token_refresh_on_expired(token_expiry_server): assert len(token_expiry_server["auth_payloads"]) == 3 await ws.disconnect() + + +@pytest.mark.asyncio +async def test_static_token_not_refreshed(token_expiry_server): + """When a user provides a static token, code 40 should NOT refresh it.""" + ws = StreamWS( + api_key="k", + api_secret="s" * 32, + user_id="alice", + token="my-static-token", + base_url=f"http://127.0.0.1:{token_expiry_server['port']}", + healthcheck_interval=100, + max_retries=5, + backoff_base=0.05, + backoff_max=0.1, + ) + await ws.connect() + + await ws._websocket.close(code=1001, reason="going away") + await asyncio.sleep(0.5) + + # With a static token, code 40 should be treated as fatal auth error + # (can't refresh), so client should give up + assert not ws.connected From c0a82fab1e30f12e80c59d993244d016117bd431 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 18:37:20 +0400 Subject: [PATCH 17/39] style: apply formatter to ws client and tests --- getstream/ws/client.py | 7 ++++++- tests/ws/test_ws_client.py | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index b3ea4132..7ae4034a 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -220,7 +220,12 @@ async def _reconnect(self, reason: str) -> None: self._backoff_max, ) delay *= 1 + random.random() - logger.debug("Reconnect attempt %d/%d in %.2fs", attempt, self._max_retries, delay) + logger.debug( + "Reconnect attempt %d/%d in %.2fs", + attempt, + self._max_retries, + delay, + ) await asyncio.sleep(delay) if not self._connected: diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index ca2a6bae..493d1ee4 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -144,7 +144,8 @@ async def test_heartbeat_sent(mock_server): # Health checks are sent as arrays per the Stream protocol: [{type, client_id}] heartbeats = [ - m for m in mock_server["client_messages"] + m + for m in mock_server["client_messages"] if isinstance(m, list) and m and m[0].get("type") == "health.check" ] assert len(heartbeats) >= 2 From 28c78afedbec55ca2fbd87e6c47a7f35cad1fcbb Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 19:08:31 +0400 Subject: [PATCH 18/39] fix: use /api/v2/connect path for WebSocket endpoint The WS endpoint is at /api/v2/connect, not /connect. Using the wrong path caused the server to immediately close with 1000, which looked like a ConnectionClosedOK before auth could be sent. --- getstream/ws/client.py | 2 +- tests/ws/test_ws_client.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 7ae4034a..93f9c0ee 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -81,7 +81,7 @@ def ws_url(self) -> str: "X-Stream-Client": self._user_agent, } ) - return f"{scheme}/connect?{params}" + return f"{scheme}/api/v2/connect?{params}" @property def connected(self) -> bool: diff --git a/tests/ws/test_ws_client.py b/tests/ws/test_ws_client.py index 493d1ee4..a48fa0dd 100644 --- a/tests/ws/test_ws_client.py +++ b/tests/ws/test_ws_client.py @@ -27,7 +27,7 @@ def test_ws_url_construction(): base_url="https://chat.stream-io-api.com", ) url = ws.ws_url - assert url.startswith("wss://chat.stream-io-api.com/connect?") + assert url.startswith("wss://chat.stream-io-api.com/api/v2/connect?") assert "api_key=my-key" in url assert "stream-auth-type=jwt" in url @@ -40,7 +40,7 @@ def test_ws_url_construction_http(): base_url="http://localhost:3030", ) url = ws.ws_url - assert url.startswith("ws://localhost:3030/connect?") + assert url.startswith("ws://localhost:3030/api/v2/connect?") @pytest_asyncio.fixture() From a85ffcef32a006f5a8c1675fb7e4b785eb2ff5b4 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 19:09:33 +0400 Subject: [PATCH 19/39] fix: support decorator syntax for on_wildcard() on_wildcard("*", handler) worked but @ws.on_wildcard("*") did not, since listener was required. Now it supports both forms, consistent with pyee's .on() decorator pattern. --- getstream/utils/event_emitter.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/getstream/utils/event_emitter.py b/getstream/utils/event_emitter.py index af398362..be406d91 100644 --- a/getstream/utils/event_emitter.py +++ b/getstream/utils/event_emitter.py @@ -84,12 +84,26 @@ def emit(self, event, *args, **kwargs): return result - def on_wildcard(self, pattern, listener): - """Register a wildcard event listener""" - if pattern not in self._wildcard_listeners: - self._wildcard_listeners[pattern] = [] - self._wildcard_listeners[pattern].append(listener) - return self + def on_wildcard(self, pattern, listener=None): + """Register a wildcard event listener. + + Can be used directly or as a decorator: + ws.on_wildcard("*", handler) + + @ws.on_wildcard("*") + def handler(event_type, event): + ... + """ + def _register(fn): + if pattern not in self._wildcard_listeners: + self._wildcard_listeners[pattern] = [] + self._wildcard_listeners[pattern].append(fn) + return fn + + if listener is not None: + _register(listener) + return self + return _register def remove_wildcard_listener(self, pattern, listener): """Remove a specific wildcard listener""" From e0f64a8bbf83cd12dae539d2dcfd834e587c9a9c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 20:02:19 +0400 Subject: [PATCH 20/39] fix: allow kwargs to override base_url and user_agent in connect_ws connect_ws passed base_url from self.base_url AND forwarded kwargs, causing a duplicate keyword argument error when callers tried to override base_url. Now kwargs take precedence over defaults. --- getstream/stream.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index 92f156e4..60543831 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -216,14 +216,17 @@ async def connect_ws( ): from getstream.ws import StreamWS + defaults = { + "base_url": self.base_url, + "user_agent": self.user_agent, + } + defaults.update(kwargs) ws = StreamWS( api_key=self.api_key, api_secret=self.api_secret, user_id=user_id, user_details=user_details or {"id": user_id}, - base_url=self.base_url, - user_agent=self.user_agent, - **kwargs, + **defaults, ) await ws.connect() self._ws_connections.append(ws) From d6b94eca3d93843d5221fc98e8e4c4e55662558e Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 20:13:01 +0400 Subject: [PATCH 21/39] style: apply ruff format and remove unused import --- getstream/utils/event_emitter.py | 1 + tests/ws/test_async_stream_ws.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/getstream/utils/event_emitter.py b/getstream/utils/event_emitter.py index be406d91..bc8a3905 100644 --- a/getstream/utils/event_emitter.py +++ b/getstream/utils/event_emitter.py @@ -94,6 +94,7 @@ def on_wildcard(self, pattern, listener=None): def handler(event_type, event): ... """ + def _register(fn): if pattern not in self._wildcard_listeners: self._wildcard_listeners[pattern] = [] diff --git a/tests/ws/test_async_stream_ws.py b/tests/ws/test_async_stream_ws.py index c3f7dd6c..bd395ea7 100644 --- a/tests/ws/test_async_stream_ws.py +++ b/tests/ws/test_async_stream_ws.py @@ -1,4 +1,3 @@ -import asyncio import json import pytest From 00fe0b665dd5460232cbe7d8a36ea994fec22912 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 20:15:17 +0400 Subject: [PATCH 22/39] docs: update WS README with findings from integration testing Adds call-watching section (client-side token requirement), corrects the WS endpoint path to /api/v2/connect, documents array-wrapped health checks, ws_id guard, close code behavior, static token protection, and on_wildcard decorator support. --- getstream/ws/README.md | 107 +++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 47 deletions(-) diff --git a/getstream/ws/README.md b/getstream/ws/README.md index 6e01f35a..169b4b9f 100644 --- a/getstream/ws/README.md +++ b/getstream/ws/README.md @@ -1,6 +1,6 @@ # WebSocket Client (`getstream.ws`) -General-purpose WebSocket client for consuming real-time events from Stream's coordinator API. Works with chat events, video events, custom events, etc. +General-purpose WebSocket client for consuming real-time events from Stream's coordinator API. Works with chat events, video call events, custom events, etc. ## Install @@ -40,6 +40,45 @@ async def main(): asyncio.run(main()) ``` +## Watching a video call + +To receive events for a specific video call (custom events, participant joins/leaves, etc.), you must "watch" the call after connecting. The coordinator only delivers call-scoped events to connections that have subscribed. + +**Important:** The watch request must use a **client-side token** (JWT with `user_id`), not the server admin token. The coordinator checks `IsClientSide()` and silently skips the subscription for server tokens. + +```python +from getstream import AsyncStream +from getstream.utils import build_query_param + +client = AsyncStream(api_key="...", api_secret="...") + +# 1. Connect WS as the user +ws = await client.connect_ws(user_id="agent") + +# 2. Watch the call with a client-side token +user_token = client.create_token("agent") +user_client = AsyncStream( + api_key=client.api_key, + api_secret=client.api_secret, + base_url=client.base_url, +) +user_client.token = user_token +user_client.headers["authorization"] = user_token +user_client.client.headers["authorization"] = user_token + +await user_client.video.get( + "/api/v2/video/call/{type}/{id}", + path_params={"type": "default", "id": "my-call-id"}, + query_params=build_query_param(connection_id=ws.connection_id), +) +await user_client.aclose() + +# 3. Now call events arrive on the WS +@ws.on("custom") +def on_custom(event): + print(event["custom"]) +``` + ## Standalone usage (without AsyncStream) ```python @@ -60,31 +99,32 @@ await ws.disconnect() ### Connection lifecycle -1. **URL construction** -- `base_url` scheme is swapped (`https` to `wss`, `http` to `ws`), path `/connect` is appended, and `api_key`, `stream-auth-type=jwt`, `X-Stream-Client` are added as query params. +1. **URL construction** -- `base_url` scheme is swapped (`https` to `wss`, `http` to `ws`), path `/api/v2/connect` is appended, and `api_key`, `stream-auth-type=jwt`, `X-Stream-Client` are added as query params. 2. **Auth handshake** -- on WebSocket open, the client sends: ```json {"token": "", "user_details": {"id": "alice", ...}} ``` - Server responds with `connection.ok` (success) or `connection.error` (failure). + Server must respond with `connection.ok`. Any other response type (including `connection.error`) raises `StreamWSAuthError`. 3. **Background tasks** -- after auth, two async tasks start: - - **Reader loop**: reads messages, parses JSON, emits events by their `type` field. - - **Heartbeat loop**: sends `{"type": "health.check", "client_id": ""}` every 25s (configurable). If no message is received for 35s (configurable), triggers reconnect. + - **Reader loop**: reads messages, parses JSON, emits events by their `type` field. Uses a `ws_id` counter to ignore stale messages from old connections during reconnect. + - **Heartbeat loop**: sends `[{"type": "health.check", "client_id": ""}]` (array-wrapped per protocol) every 25s (configurable). If no message is received for 35s (configurable), triggers reconnect. -4. **Disconnect** -- cancels background tasks, closes the socket with code 1000. +4. **Disconnect** -- cancels all background tasks (reader, heartbeat, reconnect), closes the socket with code 1000. ### Reconnection Triggered automatically when: -- The server closes the connection unexpectedly. +- The server closes the connection with a non-1000 code (code 1000 = intentional close, no reconnect). - The heartbeat detects no messages within `healthcheck_timeout`. - A WebSocket error occurs while sending a heartbeat. Strategy: - **Exponential backoff with jitter**: base 0.25s, doubles per attempt, capped at 5s, plus random jitter to prevent thundering herd. - **Max retries**: 5 by default, then gives up and sets `connected = False`. -- **Token refresh**: if the server rejects with error code 40 (`TOKEN_EXPIRED`), the cached token is cleared and a fresh JWT is generated for the next attempt. +- **Token refresh**: if the server rejects with error code 40 (`TOKEN_EXPIRED`) and the token was auto-generated (not a static user-provided token), the cached token is cleared and a fresh JWT is generated for the next attempt. Static tokens are treated as non-refreshable. +- **ws_id guard**: each connection increments an internal counter. The reader loop exits if the counter changes, preventing stale messages from old connections being emitted after reconnect. ### Event listening @@ -96,7 +136,7 @@ Strategy: def handler(event): ... -# Wildcard patterns +# Wildcard patterns (decorator or direct call) @ws.on_wildcard("message.*") # single level: message.new, message.updated def handler(event_type, event): ... @@ -110,7 +150,7 @@ def handler(event_type, event): ... ``` -Both sync and async handlers are supported. +Both sync and async handlers are supported. `on_wildcard` works both as a decorator (`@ws.on_wildcard("*")`) and as a direct call (`ws.on_wildcard("*", handler)`). ## Configuration @@ -123,7 +163,7 @@ All constructor params (also accepted as kwargs to `client.connect_ws()`): | `user_id` | required | User ID to connect as | | `user_details` | `{"id": user_id}` | User details sent during auth | | `base_url` | `https://chat.stream-io-api.com` | Base API URL | -| `token` | auto-generated | Pre-generated JWT (skips auto-generation) | +| `token` | auto-generated | Pre-generated JWT (skips auto-generation, treated as static/non-refreshable) | | `user_agent` | `stream-python-client-{VERSION}` | Client identifier | | `healthcheck_interval` | `25.0` | Seconds between heartbeat pings | | `healthcheck_timeout` | `35.0` | Seconds of silence before reconnect | @@ -131,6 +171,8 @@ All constructor params (also accepted as kwargs to `client.connect_ws()`): | `backoff_base` | `0.25` | Initial backoff delay in seconds | | `backoff_max` | `5.0` | Maximum backoff delay in seconds | +Note: `connect_ws()` kwargs override the defaults from `AsyncStream` (e.g. you can pass a different `base_url`). + ## Testing ### Unit tests (no credentials needed) @@ -141,46 +183,16 @@ uv run pytest tests/ws/ --timeout=10 Tests use a local mock WebSocket server (`websockets.serve` on `127.0.0.1:0`) and cover: - Instantiation and URL construction -- Auth handshake (success and failure) +- Auth handshake (success, failure, and unexpected response types) - Event dispatch to typed and wildcard listeners -- Heartbeat message sending -- Auto-reconnect on server disconnect +- Heartbeat message sending (array format) +- Auto-reconnect on server disconnect (non-1000 close codes) +- No reconnect on intentional close (code 1000) - Token refresh on error code 40 +- Static token protection (no refresh for user-provided tokens) +- ws_id stale message guard - `AsyncStream.connect_ws()` integration and `aclose()` cleanup -### Manual integration test (requires credentials) - -```python -import asyncio -from getstream import AsyncStream - -async def main(): - client = AsyncStream(api_key="...", api_secret="...") - - # Create a user first - await client.upsert_users(UserRequest(id="test-ws-user")) - - # Connect WS as that user - ws = await client.connect_ws(user_id="test-ws-user") - - received = [] - @ws.on_wildcard("*") - def on_any(event_type, event): - print(f"[{event_type}] {event}") - received.append(event) - - # In another terminal or script, send a chat message via REST API - # to a channel the user is a member of. Verify the event appears here. - - await asyncio.sleep(30) - print(f"Received {len(received)} events") - - await ws.disconnect() - await client.aclose() - -asyncio.run(main()) -``` - ### Debugging Enable WebSocket-level debug logs: @@ -213,3 +225,4 @@ Reference implementation: `stream-video-js/packages/client/src/coordinator/conne - **Async only** -- WebSocket is inherently async. Sync users can call `stream.as_async()` to get an `AsyncStream`. - **No `products` filter yet** -- the JS SDK sends `products: ["video"]` or `["chat"]` to filter events. This can be added to the auth payload when needed. - **Single connection per `connect_ws()` call** -- each call creates a separate WS connection. No multiplexing. +- **Call watching requires manual setup** -- receiving call-scoped events requires a separate REST call with a client-side token (see "Watching a video call" above). This could be wrapped in a helper method in the future. From d68b9de272760272597cb318bf4443b45052d8b3 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 21:47:39 +0400 Subject: [PATCH 23/39] fix: prevent leaked exceptions in reconnect and disconnect cleanup _reconnect() raised StreamWSAuthError from a fire-and-forget task, causing "Task exception was never retrieved" warnings. Now it logs and returns instead. _cancel_tasks() only caught CancelledError but a finished task could hold other exceptions (e.g. from a failed reconnect), breaking disconnect/aclose. Now catches Exception too. --- getstream/ws/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 93f9c0ee..4026b4a2 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -242,9 +242,9 @@ async def _reconnect(self, reason: str) -> None: logger.info("Token expired (code 40), refreshing") self._token = None continue - logger.error("Auth failed during reconnect") + logger.error("Auth failed during reconnect (code %s)", error_code) self._connected = False - raise + return except Exception as e: logger.warning("Reconnect attempt %d failed: %s", attempt, e) @@ -259,7 +259,7 @@ async def _cancel_tasks(self) -> None: task.cancel() try: await task - except asyncio.CancelledError: + except (asyncio.CancelledError, Exception): pass self._reader_task = None self._heartbeat_task = None From 5b1acabdd0d2a2aaae85eb2131fc87b5a996d9b6 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 21:53:16 +0400 Subject: [PATCH 24/39] fix: prevent reconnect from cancelling itself via _cancel_tasks _reconnect() called _cancel_tasks() which cancelled _reconnect_task (the currently running task), aborting retries before they could complete. Split into _cancel_background_tasks (reader + heartbeat, safe from reconnect) and _cancel_all_tasks (includes reconnect, used by disconnect). --- getstream/ws/client.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 4026b4a2..c9df0b45 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -204,7 +204,7 @@ async def _reconnect(self, reason: str) -> None: logger.info("Reconnecting: %s", reason) try: - await self._cancel_tasks() + await self._cancel_background_tasks() if self._websocket: try: await self._websocket.close() @@ -253,8 +253,9 @@ async def _reconnect(self, reason: str) -> None: finally: self._reconnecting = False - async def _cancel_tasks(self) -> None: - for task in (self._reader_task, self._heartbeat_task, self._reconnect_task): + async def _cancel_background_tasks(self) -> None: + """Cancel reader and heartbeat tasks (safe to call from _reconnect).""" + for task in (self._reader_task, self._heartbeat_task): if task: task.cancel() try: @@ -263,12 +264,22 @@ async def _cancel_tasks(self) -> None: pass self._reader_task = None self._heartbeat_task = None - self._reconnect_task = None + + async def _cancel_all_tasks(self) -> None: + """Cancel all tasks including reconnect (used by disconnect).""" + await self._cancel_background_tasks() + if self._reconnect_task: + self._reconnect_task.cancel() + try: + await self._reconnect_task + except (asyncio.CancelledError, Exception): + pass + self._reconnect_task = None async def disconnect(self) -> None: self._connected = False self._connection_id = None - await self._cancel_tasks() + await self._cancel_all_tasks() if self._websocket: try: await self._websocket.close(code=1000, reason="client disconnect") From 43d059d8ca6906135fbec05ee49e16e24635ed37 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 10 Apr 2026 22:01:45 +0400 Subject: [PATCH 25/39] fix: clean up stale tasks on connect and guard emit from listener errors connect() now cleans up any prior tasks and socket state directly, instead of calling disconnect(). This prevents leaked heartbeat tasks from a prior normal close (code 1000) from running against a new socket, without the awkward disconnect-before-connect pattern. Also wraps self.emit() in try/except so a failing sync listener doesn't kill the reader loop and silently drop all subsequent events. --- getstream/ws/client.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index c9df0b45..74afe97f 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -137,8 +137,16 @@ async def _open_connection(self) -> dict: return message async def connect(self) -> dict: - if self._connected: - await self.disconnect() + # Clean up any prior state (stale tasks from a previous connection + # may still be running even if _connected is already false) + await self._cancel_all_tasks() + if self._websocket: + try: + await self._websocket.close() + except Exception: + pass + self._websocket = None + self._connection_id = None message = await self._open_connection() self._connected = True @@ -163,7 +171,10 @@ async def _reader_loop(self) -> None: self._last_received = time.monotonic() message = json.loads(raw) event_type = message.get("type", "unknown") - self.emit(event_type, message) + try: + self.emit(event_type, message) + except Exception: + logger.exception("Error in event listener for %s", event_type) except websockets.exceptions.ConnectionClosed as e: close_code = getattr(e.rcvd, "code", None) if e.rcvd else None logger.debug("WebSocket closed (code=%s) in reader", close_code) From 6dd967314875b501ad8e48312e364d573385a9f1 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 11 Apr 2026 00:19:38 +0400 Subject: [PATCH 26/39] fix: prevent duplicate reconnect tasks and socket leaks on auth failure _trigger_reconnect now sets _reconnecting=True before scheduling the task, so two failures in the same event loop tick can't both spawn reconnect tasks. The redundant guard inside _reconnect is removed. _open_connection now closes the socket on any handshake failure (not just wrong response type), preventing leaked connections from recv() or json.loads() errors. --- getstream/ws/client.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 74afe97f..0f0f3c8f 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -115,22 +115,29 @@ async def _open_connection(self) -> dict: close_timeout=1.0, ) - auth_payload = { - "token": self._ensure_token(), - "user_details": self._user_details, - } - await self._websocket.send(json.dumps(auth_payload)) - - raw = await self._websocket.recv() - message = json.loads(raw) - - msg_type = message.get("type") - if msg_type != "connection.ok": - await self._websocket.close() + try: + auth_payload = { + "token": self._ensure_token(), + "user_details": self._user_details, + } + await self._websocket.send(json.dumps(auth_payload)) + + raw = await self._websocket.recv() + message = json.loads(raw) + + msg_type = message.get("type") + if msg_type != "connection.ok": + raise StreamWSAuthError( + f"Expected connection.ok, got {msg_type}: {message}", + response=message, + ) + except Exception: + try: + await self._websocket.close() + except Exception: + pass self._websocket = None - raise StreamWSAuthError( - f"Expected connection.ok, got {msg_type}: {message}", response=message - ) + raise self._connection_id = message.get("connection_id") self._last_received = time.monotonic() @@ -159,6 +166,7 @@ def _start_tasks(self) -> None: def _trigger_reconnect(self, reason: str) -> None: if self._connected and not self._reconnecting: + self._reconnecting = True self._reconnect_task = asyncio.create_task(self._reconnect(reason)) async def _reader_loop(self) -> None: @@ -209,9 +217,6 @@ async def _heartbeat_loop(self) -> None: break async def _reconnect(self, reason: str) -> None: - if self._reconnecting: - return - self._reconnecting = True logger.info("Reconnecting: %s", reason) try: From e2fb598ee35be30e62fc9dcaa0c180bd93f4368e Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 11 Apr 2026 00:25:30 +0400 Subject: [PATCH 27/39] refactor: reuse BASE_URL from getstream.stream instead of duplicating --- getstream/ws/client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 0f0f3c8f..3e05686f 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -10,13 +10,12 @@ import websockets from websockets import ClientConnection +from getstream.stream import BASE_URL from getstream.utils.event_emitter import StreamAsyncIOEventEmitter from getstream.version import VERSION logger = logging.getLogger(__name__) -DEFAULT_BASE_URL = "https://chat.stream-io-api.com" - TOKEN_EXPIRED_CODE = 40 @@ -34,7 +33,7 @@ def __init__( api_secret: str, user_id: str, *, - base_url: str = DEFAULT_BASE_URL, + base_url: str = BASE_URL, user_agent: str | None = None, user_details: Optional[dict] = None, token: Optional[str] = None, From afcbe25ab73f1cd18148c633b0c8e0dabc4dead6 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 11 Apr 2026 00:45:24 +0400 Subject: [PATCH 28/39] refactor: extract _close_websocket helper to deduplicate socket cleanup Four places did "if websocket, try close, ignore error, set None." Consolidated into _close_websocket(code, reason) with debug logging on failure. --- getstream/ws/client.py | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 3e05686f..c6bd9e55 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -94,6 +94,16 @@ def connection_id(self) -> Optional[str]: def ws_id(self) -> int: return self._ws_id + async def _close_websocket( + self, code: int = 1000, reason: str = "" + ) -> None: + if self._websocket: + try: + await self._websocket.close(code=code, reason=reason) + except Exception: + logger.debug("Error closing WebSocket", exc_info=True) + self._websocket = None + def _ensure_token(self) -> str: if self._token: return self._token @@ -131,11 +141,7 @@ async def _open_connection(self) -> dict: response=message, ) except Exception: - try: - await self._websocket.close() - except Exception: - pass - self._websocket = None + await self._close_websocket() raise self._connection_id = message.get("connection_id") @@ -146,12 +152,7 @@ async def connect(self) -> dict: # Clean up any prior state (stale tasks from a previous connection # may still be running even if _connected is already false) await self._cancel_all_tasks() - if self._websocket: - try: - await self._websocket.close() - except Exception: - pass - self._websocket = None + await self._close_websocket() self._connection_id = None message = await self._open_connection() @@ -220,12 +221,7 @@ async def _reconnect(self, reason: str) -> None: try: await self._cancel_background_tasks() - if self._websocket: - try: - await self._websocket.close() - except Exception: - pass - self._websocket = None + await self._close_websocket() for attempt in range(1, self._max_retries + 1): if not self._connected: @@ -295,10 +291,4 @@ async def disconnect(self) -> None: self._connected = False self._connection_id = None await self._cancel_all_tasks() - if self._websocket: - try: - await self._websocket.close(code=1000, reason="client disconnect") - except Exception: - pass - finally: - self._websocket = None + await self._close_websocket(code=1000, reason="client disconnect") From 47acdcae3d140ca00729796bf670a99c943b9508 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 11 Apr 2026 00:50:28 +0400 Subject: [PATCH 29/39] refactor: use urlparse/urlunparse for WS URL construction String replacement for scheme swapping was fragile. urlparse properly handles the URL components and _replace makes the intent clear. --- getstream/ws/client.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index c6bd9e55..92c51241 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -4,7 +4,7 @@ import random import time from typing import Optional -from urllib.parse import urlencode +from urllib.parse import urlencode, urlparse, urlunparse import jwt import websockets @@ -70,17 +70,21 @@ def __init__( @property def ws_url(self) -> str: - scheme = self._base_url.replace("https://", "wss://").replace( - "http://", "ws://" - ) - params = urlencode( + parsed = urlparse(self._base_url) + ws_scheme = "wss" if parsed.scheme == "https" else "ws" + query = urlencode( { "api_key": self.api_key, "stream-auth-type": "jwt", "X-Stream-Client": self._user_agent, } ) - return f"{scheme}/api/v2/connect?{params}" + ws_parsed = parsed._replace( + scheme=ws_scheme, + path="/api/v2/connect", + query=query, + ) + return urlunparse(ws_parsed) @property def connected(self) -> bool: From 698ad74fc77d4fb836f9c5ae86e049862dc61d74 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sun, 12 Apr 2026 13:53:58 +0400 Subject: [PATCH 30/39] fix: prevent StreamWS leak on connect failure and prune stale connections If ws.connect() failed (e.g. auth error), the StreamWS object was abandoned without cleanup, leaking the websocket. Now disconnect() is called before re-raising. Also prune disconnected WS instances from _ws_connections on each connect_ws() call to prevent unbounded list growth. --- getstream/stream.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/getstream/stream.py b/getstream/stream.py index 60543831..dc22a249 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -228,7 +228,13 @@ async def connect_ws( user_details=user_details or {"id": user_id}, **defaults, ) - await ws.connect() + try: + await ws.connect() + except Exception: + await ws.disconnect() + raise + # Remove any previously disconnected WS instances to prevent unbounded growth + self._ws_connections[:] = [w for w in self._ws_connections if w.connected] self._ws_connections.append(ws) return ws From c256d1c1128e04084e6022819bca232d799b5fac Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:06:00 +0400 Subject: [PATCH 31/39] revert: remove on_wildcard decorator support from this PR The decorator syntax for on_wildcard() is a change to the event emitter, not the WS client. Should be a separate PR. --- getstream/utils/event_emitter.py | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/getstream/utils/event_emitter.py b/getstream/utils/event_emitter.py index bc8a3905..af398362 100644 --- a/getstream/utils/event_emitter.py +++ b/getstream/utils/event_emitter.py @@ -84,27 +84,12 @@ def emit(self, event, *args, **kwargs): return result - def on_wildcard(self, pattern, listener=None): - """Register a wildcard event listener. - - Can be used directly or as a decorator: - ws.on_wildcard("*", handler) - - @ws.on_wildcard("*") - def handler(event_type, event): - ... - """ - - def _register(fn): - if pattern not in self._wildcard_listeners: - self._wildcard_listeners[pattern] = [] - self._wildcard_listeners[pattern].append(fn) - return fn - - if listener is not None: - _register(listener) - return self - return _register + def on_wildcard(self, pattern, listener): + """Register a wildcard event listener""" + if pattern not in self._wildcard_listeners: + self._wildcard_listeners[pattern] = [] + self._wildcard_listeners[pattern].append(listener) + return self def remove_wildcard_listener(self, pattern, listener): """Remove a specific wildcard listener""" From 7786f5a71f24f7001a0f69713a18dfda052406f3 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:11:17 +0400 Subject: [PATCH 32/39] docs: update README to use on_wildcard direct call form on_wildcard decorator syntax was reverted from this PR, so examples should use the direct call form ws.on_wildcard("pattern", handler). --- getstream/ws/README.md | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/getstream/ws/README.md b/getstream/ws/README.md index 169b4b9f..223861ba 100644 --- a/getstream/ws/README.md +++ b/getstream/ws/README.md @@ -26,9 +26,9 @@ async def main(): async def on_message(event): print(f"New message in {event['cid']}: {event['message']['text']}") - @ws.on_wildcard("call.**") async def on_call_event(event_type, event): print(f"{event_type}: {event}") + ws.on_wildcard("call.**", on_call_event) # keep running until interrupted try: @@ -131,26 +131,21 @@ Strategy: `StreamWS` extends `StreamAsyncIOEventEmitter` (pyee-based). Two ways to listen: ```python -# Exact event type +# Exact event type (supports decorator syntax via pyee) @ws.on("message.new") def handler(event): ... -# Wildcard patterns (decorator or direct call) -@ws.on_wildcard("message.*") # single level: message.new, message.updated -def handler(event_type, event): - ... - -@ws.on_wildcard("call.**") # multi level: call.created, call.member.added -def handler(event_type, event): - ... +# Wildcard patterns (direct call) +ws.on_wildcard("message.*", handler) # single level: message.new, message.updated +ws.on_wildcard("call.**", handler) # multi level: call.created, call.member.added +ws.on_wildcard("*", handler) # all events -@ws.on_wildcard("*") # all events def handler(event_type, event): ... ``` -Both sync and async handlers are supported. `on_wildcard` works both as a decorator (`@ws.on_wildcard("*")`) and as a direct call (`ws.on_wildcard("*", handler)`). +Both sync and async handlers are supported. ## Configuration From 39769ba4c729561488909c718cd98ca60c067b7b Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:17:26 +0400 Subject: [PATCH 33/39] feat: make StreamWS an async context manager StreamWS now supports async with for automatic connect/disconnect: async with client.connect_ws(user_id="alice") as ws: ws.on("custom", handler) connect_ws() is now sync (returns unconnected StreamWS), the context manager handles the lifecycle. Standalone usage also works: async with StreamWS(...) as ws: ... --- getstream/stream.py | 14 ++++++-------- getstream/ws/client.py | 8 ++++++++ tests/ws/test_async_stream_ws.py | 33 +++++++++++++++++++++++--------- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index dc22a249..b89a0c0f 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -208,12 +208,17 @@ def moderation(self) -> AsyncModerationClient: user_agent=self.user_agent, ) - async def connect_ws( + def connect_ws( self, user_id: str, user_details: Optional[dict] = None, **kwargs, ): + """Create a StreamWS instance. Use as an async context manager: + + async with client.connect_ws(user_id="alice") as ws: + ws.on("custom", handler) + """ from getstream.ws import StreamWS defaults = { @@ -228,13 +233,6 @@ async def connect_ws( user_details=user_details or {"id": user_id}, **defaults, ) - try: - await ws.connect() - except Exception: - await ws.disconnect() - raise - # Remove any previously disconnected WS instances to prevent unbounded growth - self._ws_connections[:] = [w for w in self._ws_connections if w.connected] self._ws_connections.append(ws) return ws diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 92c51241..968fd592 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -152,6 +152,14 @@ async def _open_connection(self) -> dict: self._last_received = time.monotonic() return message + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.disconnect() + return False + async def connect(self) -> dict: # Clean up any prior state (stale tasks from a previous connection # may still be running even if _connected is already false) diff --git a/tests/ws/test_async_stream_ws.py b/tests/ws/test_async_stream_ws.py index bd395ea7..410e4cc5 100644 --- a/tests/ws/test_async_stream_ws.py +++ b/tests/ws/test_async_stream_ws.py @@ -37,19 +37,18 @@ async def handler(ws): @pytest.mark.asyncio -async def test_connect_ws(mock_server): +async def test_connect_ws_context_manager(mock_server): client = AsyncStream( api_key="k" * 32, api_secret="s" * 32, base_url=f"http://127.0.0.1:{mock_server['port']}", ) - ws = await client.connect_ws(user_id="alice") + async with client.connect_ws(user_id="alice") as ws: + assert ws.connected + assert ws.connection_id == "conn-int" + assert ws.user_id == "alice" - assert ws.connected - assert ws.connection_id == "conn-int" - assert ws.user_id == "alice" - - await ws.disconnect() + assert not ws.connected await client.aclose() @@ -60,8 +59,24 @@ async def test_aclose_disconnects_ws(mock_server): api_secret="s" * 32, base_url=f"http://127.0.0.1:{mock_server['port']}", ) - ws = await client.connect_ws(user_id="bob") - assert ws.connected + async with client.connect_ws(user_id="bob") as ws: + assert ws.connected await client.aclose() assert not ws.connected + + +@pytest.mark.asyncio +async def test_standalone_context_manager(mock_server): + from getstream.ws import StreamWS + + async with StreamWS( + api_key="k" * 32, + api_secret="s" * 32, + user_id="alice", + base_url=f"http://127.0.0.1:{mock_server['port']}", + ) as ws: + assert ws.connected + assert ws.connection_id == "conn-int" + + assert not ws.connected From 758ff4ed5db40e0bd6017dd04579c9f3335b0e48 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:19:57 +0400 Subject: [PATCH 34/39] refactor: remove implicit cleanup from connect() connect() should connect, not silently clean up prior state. If the caller forgot to disconnect, that's a bug in their code, not something we should paper over. The context manager handles lifecycle correctly. --- getstream/ws/client.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 968fd592..a658361a 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -161,11 +161,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): return False async def connect(self) -> dict: - # Clean up any prior state (stale tasks from a previous connection - # may still be running even if _connected is already false) - await self._cancel_all_tasks() - await self._close_websocket() - self._connection_id = None + if self._connected: + raise RuntimeError("Already connected. Call disconnect() first.") message = await self._open_connection() self._connected = True From a605e2721c899f12310e122fe28783aa62857286 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:28:15 +0400 Subject: [PATCH 35/39] refactor: move BASE_URL to config.py and remove lazy import BASE_URL was defined in stream.py, forcing ws/client.py to import from there and creating a circular dependency that required a lazy import in connect_ws(). Moving BASE_URL to config.py breaks the cycle so StreamWS can be imported at module level in stream.py. --- getstream/config.py | 2 ++ getstream/stream.py | 7 ++----- getstream/ws/client.py | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/getstream/config.py b/getstream/config.py index 486205ec..18a3742b 100644 --- a/getstream/config.py +++ b/getstream/config.py @@ -1,5 +1,7 @@ from getstream.version import VERSION +BASE_URL = "https://chat.stream-io-api.com/" + class BaseConfig: def __init__( diff --git a/getstream/stream.py b/getstream/stream.py index b89a0c0f..804e1e8e 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -18,15 +18,14 @@ from getstream.models import FullUserResponse, UserRequest from getstream.moderation.client import ModerationClient from getstream.moderation.async_client import ModerationClient as AsyncModerationClient +from getstream.config import BASE_URL from getstream.utils import validate_and_clean_url from getstream.video.client import VideoClient from getstream.video.async_client import VideoClient as AsyncVideoClient +from getstream.ws import StreamWS from typing_extensions import deprecated -BASE_URL = "https://chat.stream-io-api.com/" - - class Settings(BaseSettings): # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, STREAM_TIMEOUT api_key: str @@ -219,8 +218,6 @@ def connect_ws( async with client.connect_ws(user_id="alice") as ws: ws.on("custom", handler) """ - from getstream.ws import StreamWS - defaults = { "base_url": self.base_url, "user_agent": self.user_agent, diff --git a/getstream/ws/client.py b/getstream/ws/client.py index a658361a..128360d8 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -10,7 +10,7 @@ import websockets from websockets import ClientConnection -from getstream.stream import BASE_URL +from getstream.config import BASE_URL from getstream.utils.event_emitter import StreamAsyncIOEventEmitter from getstream.version import VERSION From 23a04075c14088e54d5a8b04b5b43be651d92b19 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:31:45 +0400 Subject: [PATCH 36/39] style: apply ruff format --- getstream/stream.py | 4 ++-- getstream/ws/client.py | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index 804e1e8e..1ac22c62 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -215,8 +215,8 @@ def connect_ws( ): """Create a StreamWS instance. Use as an async context manager: - async with client.connect_ws(user_id="alice") as ws: - ws.on("custom", handler) + async with client.connect_ws(user_id="alice") as ws: + ws.on("custom", handler) """ defaults = { "base_url": self.base_url, diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 128360d8..1d23766d 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -98,9 +98,7 @@ def connection_id(self) -> Optional[str]: def ws_id(self) -> int: return self._ws_id - async def _close_websocket( - self, code: int = 1000, reason: str = "" - ) -> None: + async def _close_websocket(self, code: int = 1000, reason: str = "") -> None: if self._websocket: try: await self._websocket.close(code=code, reason=reason) From cd16d4b0f72e48817ee8436b6444c22dd49d71d7 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 10:56:32 +0400 Subject: [PATCH 37/39] feat: add watch_call parameter and DRY stream headers StreamWS now accepts watch_call=(call_type, call_id) which subscribes the WS connection to a specific call's events during __aenter__. Uses the user token and httpx to call the coordinator, encapsulating the subscription logic that previously had to be done externally. Also extracts _stream_headers property to deduplicate the shared auth/client-id headers used in both WS URL and REST subscription. --- getstream/ws/client.py | 46 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index 1d23766d..b6df7158 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -3,9 +3,10 @@ import logging import random import time -from typing import Optional +from typing import Optional, Tuple from urllib.parse import urlencode, urlparse, urlunparse +import httpx import jwt import websockets from websockets import ClientConnection @@ -42,6 +43,7 @@ def __init__( max_retries: int = 5, backoff_base: float = 0.25, backoff_max: float = 5.0, + watch_call: Optional[Tuple[str, str]] = None, ): super().__init__() self.api_key = api_key @@ -57,6 +59,7 @@ def __init__( self._max_retries = max_retries self._backoff_base = backoff_base self._backoff_max = backoff_max + self._watch_call = watch_call self._websocket: Optional[ClientConnection] = None self._connected = False @@ -68,17 +71,18 @@ def __init__( self._reconnecting = False self._reconnect_task: Optional[asyncio.Task] = None + @property + def _stream_headers(self) -> dict: + return { + "stream-auth-type": "jwt", + "X-Stream-Client": self._user_agent, + } + @property def ws_url(self) -> str: parsed = urlparse(self._base_url) ws_scheme = "wss" if parsed.scheme == "https" else "ws" - query = urlencode( - { - "api_key": self.api_key, - "stream-auth-type": "jwt", - "X-Stream-Client": self._user_agent, - } - ) + query = urlencode({"api_key": self.api_key, **self._stream_headers}) ws_parsed = parsed._replace( scheme=ws_scheme, path="/api/v2/connect", @@ -152,8 +156,34 @@ async def _open_connection(self) -> dict: async def __aenter__(self): await self.connect() + if self._watch_call: + call_type, call_id = self._watch_call + await self._subscribe_to_call(call_type, call_id) return self + async def _subscribe_to_call(self, call_type: str, call_id: str) -> None: + """Subscribe to call events by 'watching' the call with our connection_id. + + Uses the user token (not admin token) because the coordinator only + registers subscriptions for client-side requests. + """ + token = self._ensure_token() + parsed = urlparse(self._base_url) + url = urlunparse( + parsed._replace(path=f"/api/v2/video/call/{call_type}/{call_id}") + ) + params = {"api_key": self.api_key, "connection_id": self._connection_id} + headers = {"authorization": token, **self._stream_headers} + async with httpx.AsyncClient() as http: + response = await http.get(url, params=params, headers=headers) + response.raise_for_status() + logger.info( + "Watching call %s/%s (connection_id=%s)", + call_type, + call_id, + self._connection_id, + ) + async def __aexit__(self, exc_type, exc_val, exc_tb): await self.disconnect() return False From 849acf317c59ed4e79ceb4e905eefa2070e54c9f Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 11:01:22 +0400 Subject: [PATCH 38/39] docs: move WS documentation to PR description, remove README.md The internal README duplicated what belongs in the PR description. Single source of truth for the implementation docs. --- getstream/ws/README.md | 223 ----------------------------------------- 1 file changed, 223 deletions(-) delete mode 100644 getstream/ws/README.md diff --git a/getstream/ws/README.md b/getstream/ws/README.md deleted file mode 100644 index 223861ba..00000000 --- a/getstream/ws/README.md +++ /dev/null @@ -1,223 +0,0 @@ -# WebSocket Client (`getstream.ws`) - -General-purpose WebSocket client for consuming real-time events from Stream's coordinator API. Works with chat events, video call events, custom events, etc. - -## Install - -``` -pip install getstream[ws] -``` - -## Quick start - -```python -import asyncio -from getstream import AsyncStream - -async def main(): - client = AsyncStream(api_key="your-key", api_secret="your-secret") - - ws = await client.connect_ws( - user_id="alice", - user_details={"id": "alice", "name": "Alice"}, - ) - - @ws.on("message.new") - async def on_message(event): - print(f"New message in {event['cid']}: {event['message']['text']}") - - async def on_call_event(event_type, event): - print(f"{event_type}: {event}") - ws.on_wildcard("call.**", on_call_event) - - # keep running until interrupted - try: - await asyncio.Event().wait() - finally: - await ws.disconnect() - await client.aclose() - -asyncio.run(main()) -``` - -## Watching a video call - -To receive events for a specific video call (custom events, participant joins/leaves, etc.), you must "watch" the call after connecting. The coordinator only delivers call-scoped events to connections that have subscribed. - -**Important:** The watch request must use a **client-side token** (JWT with `user_id`), not the server admin token. The coordinator checks `IsClientSide()` and silently skips the subscription for server tokens. - -```python -from getstream import AsyncStream -from getstream.utils import build_query_param - -client = AsyncStream(api_key="...", api_secret="...") - -# 1. Connect WS as the user -ws = await client.connect_ws(user_id="agent") - -# 2. Watch the call with a client-side token -user_token = client.create_token("agent") -user_client = AsyncStream( - api_key=client.api_key, - api_secret=client.api_secret, - base_url=client.base_url, -) -user_client.token = user_token -user_client.headers["authorization"] = user_token -user_client.client.headers["authorization"] = user_token - -await user_client.video.get( - "/api/v2/video/call/{type}/{id}", - path_params={"type": "default", "id": "my-call-id"}, - query_params=build_query_param(connection_id=ws.connection_id), -) -await user_client.aclose() - -# 3. Now call events arrive on the WS -@ws.on("custom") -def on_custom(event): - print(event["custom"]) -``` - -## Standalone usage (without AsyncStream) - -```python -from getstream.ws import StreamWS - -ws = StreamWS( - api_key="your-key", - api_secret="your-secret", - user_id="alice", - user_details={"id": "alice", "name": "Alice"}, -) -await ws.connect() -# ... register listeners, do work ... -await ws.disconnect() -``` - -## How it works - -### Connection lifecycle - -1. **URL construction** -- `base_url` scheme is swapped (`https` to `wss`, `http` to `ws`), path `/api/v2/connect` is appended, and `api_key`, `stream-auth-type=jwt`, `X-Stream-Client` are added as query params. - -2. **Auth handshake** -- on WebSocket open, the client sends: - ```json - {"token": "", "user_details": {"id": "alice", ...}} - ``` - Server must respond with `connection.ok`. Any other response type (including `connection.error`) raises `StreamWSAuthError`. - -3. **Background tasks** -- after auth, two async tasks start: - - **Reader loop**: reads messages, parses JSON, emits events by their `type` field. Uses a `ws_id` counter to ignore stale messages from old connections during reconnect. - - **Heartbeat loop**: sends `[{"type": "health.check", "client_id": ""}]` (array-wrapped per protocol) every 25s (configurable). If no message is received for 35s (configurable), triggers reconnect. - -4. **Disconnect** -- cancels all background tasks (reader, heartbeat, reconnect), closes the socket with code 1000. - -### Reconnection - -Triggered automatically when: -- The server closes the connection with a non-1000 code (code 1000 = intentional close, no reconnect). -- The heartbeat detects no messages within `healthcheck_timeout`. -- A WebSocket error occurs while sending a heartbeat. - -Strategy: -- **Exponential backoff with jitter**: base 0.25s, doubles per attempt, capped at 5s, plus random jitter to prevent thundering herd. -- **Max retries**: 5 by default, then gives up and sets `connected = False`. -- **Token refresh**: if the server rejects with error code 40 (`TOKEN_EXPIRED`) and the token was auto-generated (not a static user-provided token), the cached token is cleared and a fresh JWT is generated for the next attempt. Static tokens are treated as non-refreshable. -- **ws_id guard**: each connection increments an internal counter. The reader loop exits if the counter changes, preventing stale messages from old connections being emitted after reconnect. - -### Event listening - -`StreamWS` extends `StreamAsyncIOEventEmitter` (pyee-based). Two ways to listen: - -```python -# Exact event type (supports decorator syntax via pyee) -@ws.on("message.new") -def handler(event): - ... - -# Wildcard patterns (direct call) -ws.on_wildcard("message.*", handler) # single level: message.new, message.updated -ws.on_wildcard("call.**", handler) # multi level: call.created, call.member.added -ws.on_wildcard("*", handler) # all events - -def handler(event_type, event): - ... -``` - -Both sync and async handlers are supported. - -## Configuration - -All constructor params (also accepted as kwargs to `client.connect_ws()`): - -| Param | Default | Description | -|-------|---------|-------------| -| `api_key` | required | Stream API key | -| `api_secret` | required | Stream API secret (used to generate JWT) | -| `user_id` | required | User ID to connect as | -| `user_details` | `{"id": user_id}` | User details sent during auth | -| `base_url` | `https://chat.stream-io-api.com` | Base API URL | -| `token` | auto-generated | Pre-generated JWT (skips auto-generation, treated as static/non-refreshable) | -| `user_agent` | `stream-python-client-{VERSION}` | Client identifier | -| `healthcheck_interval` | `25.0` | Seconds between heartbeat pings | -| `healthcheck_timeout` | `35.0` | Seconds of silence before reconnect | -| `max_retries` | `5` | Max reconnect attempts before giving up | -| `backoff_base` | `0.25` | Initial backoff delay in seconds | -| `backoff_max` | `5.0` | Maximum backoff delay in seconds | - -Note: `connect_ws()` kwargs override the defaults from `AsyncStream` (e.g. you can pass a different `base_url`). - -## Testing - -### Unit tests (no credentials needed) - -``` -uv run pytest tests/ws/ --timeout=10 -``` - -Tests use a local mock WebSocket server (`websockets.serve` on `127.0.0.1:0`) and cover: -- Instantiation and URL construction -- Auth handshake (success, failure, and unexpected response types) -- Event dispatch to typed and wildcard listeners -- Heartbeat message sending (array format) -- Auto-reconnect on server disconnect (non-1000 close codes) -- No reconnect on intentional close (code 1000) -- Token refresh on error code 40 -- Static token protection (no refresh for user-provided tokens) -- ws_id stale message guard -- `AsyncStream.connect_ws()` integration and `aclose()` cleanup - -### Debugging - -Enable WebSocket-level debug logs: - -```python -import logging -logging.getLogger("getstream.ws.client").setLevel(logging.DEBUG) -logging.getLogger("websockets.client").setLevel(logging.DEBUG) -``` - -## Architecture - -``` -getstream/ws/ - __init__.py -- exports StreamWS, StreamWSAuthError - client.py -- StreamWS class (connection, auth, reader, heartbeat, reconnect) - -getstream/stream.py -- AsyncStream.connect_ws() convenience method - -Tests: - tests/ws/ - test_ws_client.py -- unit tests for StreamWS - test_async_stream_ws.py -- integration tests for AsyncStream.connect_ws() -``` - -Reference implementation: `stream-video-js/packages/client/src/coordinator/connection/connection.ts` - -## Known limitations - -- **Async only** -- WebSocket is inherently async. Sync users can call `stream.as_async()` to get an `AsyncStream`. -- **No `products` filter yet** -- the JS SDK sends `products: ["video"]` or `["chat"]` to filter events. This can be added to the auth payload when needed. -- **Single connection per `connect_ws()` call** -- each call creates a separate WS connection. No multiplexing. -- **Call watching requires manual setup** -- receiving call-scoped events requires a separate REST call with a client-side token (see "Watching a video call" above). This could be wrapped in a helper method in the future. From 85bfa1f2f39e32a45089ac1c4ac7c8f3cca86b19 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 13 Apr 2026 11:15:49 +0400 Subject: [PATCH 39/39] feat: add watch_channels parameter to subscribe to chat channel events StreamWS now accepts watch_channels=[("messaging", channel_id)] which subscribes the WS connection to chat channel events (message.new, etc.) during __aenter__. Uses POST /api/v2/chat/channels with watch=true and the user token, same pattern as watch_call. --- getstream/ws/client.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/getstream/ws/client.py b/getstream/ws/client.py index b6df7158..f3aebfed 100644 --- a/getstream/ws/client.py +++ b/getstream/ws/client.py @@ -3,7 +3,7 @@ import logging import random import time -from typing import Optional, Tuple +from typing import List, Optional, Tuple from urllib.parse import urlencode, urlparse, urlunparse import httpx @@ -44,6 +44,7 @@ def __init__( backoff_base: float = 0.25, backoff_max: float = 5.0, watch_call: Optional[Tuple[str, str]] = None, + watch_channels: Optional[List[Tuple[str, str]]] = None, ): super().__init__() self.api_key = api_key @@ -60,6 +61,7 @@ def __init__( self._backoff_base = backoff_base self._backoff_max = backoff_max self._watch_call = watch_call + self._watch_channels = watch_channels self._websocket: Optional[ClientConnection] = None self._connected = False @@ -159,6 +161,8 @@ async def __aenter__(self): if self._watch_call: call_type, call_id = self._watch_call await self._subscribe_to_call(call_type, call_id) + if self._watch_channels: + await self._subscribe_to_channels(self._watch_channels) return self async def _subscribe_to_call(self, call_type: str, call_id: str) -> None: @@ -184,6 +188,36 @@ async def _subscribe_to_call(self, call_type: str, call_id: str) -> None: self._connection_id, ) + async def _subscribe_to_channels(self, channels: List[Tuple[str, str]]) -> None: + """Subscribe to chat channel events by querying channels with watch=true. + + Uses the user token (not admin token) because the coordinator only + registers subscriptions for client-side requests. + """ + token = self._ensure_token() + parsed = urlparse(self._base_url) + url = urlunparse(parsed._replace(path="/api/v2/chat/channels")) + cids = [f"{ch_type}:{ch_id}" for ch_type, ch_id in channels] + payload = { + "filter_conditions": {"cid": {"$in": cids}}, + "watch": True, + "connection_id": self._connection_id, + } + headers = {"authorization": token, **self._stream_headers} + async with httpx.AsyncClient() as http: + response = await http.post( + url, + params={"api_key": self.api_key}, + headers=headers, + json=payload, + ) + response.raise_for_status() + logger.info( + "Watching %d channel(s) (connection_id=%s)", + len(channels), + self._connection_id, + ) + async def __aexit__(self, exc_type, exc_val, exc_tb): await self.disconnect() return False