Summary
The current WebSocket broadcast in main.py uses in-process iteration with bare except: pass error handling. At scale (30+ agents, 50+ WebSocket connections), clients silently miss critical events — collaboration updates, health alerts, execution status changes — with no way to recover missed messages on reconnect.
Solution
Replace the in-process broadcast with Redis Streams as the event bus. Redis is already running. Streams provide:
- Persistent event log with configurable retention (e.g., last 1000 events or 1 hour)
- Replay on reconnect via
XRANGE — clients that disconnect and reconnect get missed events
- Consumer groups for at-least-once delivery semantics
- Built-in backpressure via
BLOCK parameter on XREAD
Scope
- Replace
ConnectionManager.broadcast() with Redis Stream publish (XADD)
- Each WebSocket connection reads from Redis Stream (
XREAD BLOCK)
- Track per-client last-seen event ID; on reconnect, replay from that point
- Add stream trimming (MAXLEN ~10000 or time-based) to prevent unbounded growth
- Replace
FilteredWebSocketManager broadcast with stream consumer that filters by user permissions
- Remove
except: pass pattern — errors now tracked per-client with eviction after N failures
Acceptance Criteria
Key Files
src/backend/main.py — replace ConnectionManager and FilteredWebSocketManager
src/backend/services/activity_service.py — publish to stream instead of direct broadcast
src/frontend/src/utils/websocket.js — send last-event-ID on reconnect
Dependencies
None — Redis already running.
Summary
The current WebSocket broadcast in
main.pyuses in-process iteration with bareexcept: passerror handling. At scale (30+ agents, 50+ WebSocket connections), clients silently miss critical events — collaboration updates, health alerts, execution status changes — with no way to recover missed messages on reconnect.Solution
Replace the in-process broadcast with Redis Streams as the event bus. Redis is already running. Streams provide:
XRANGE— clients that disconnect and reconnect get missed eventsBLOCKparameter onXREADScope
ConnectionManager.broadcast()with Redis Stream publish (XADD)XREAD BLOCK)FilteredWebSocketManagerbroadcast with stream consumer that filters by user permissionsexcept: passpattern — errors now tracked per-client with eviction after N failuresAcceptance Criteria
Key Files
src/backend/main.py— replace ConnectionManager and FilteredWebSocketManagersrc/backend/services/activity_service.py— publish to stream instead of direct broadcastsrc/frontend/src/utils/websocket.js— send last-event-ID on reconnectDependencies
None — Redis already running.