Skip to content

feat: Redis Streams event bus for reliable WebSocket delivery (RELIABILITY-003) #306

@vybe

Description

@vybe

Summary

The current WebSocket broadcast in main.py uses in-process iteration with bare except: pass error handling. At scale (30+ agents, 50+ WebSocket connections), clients silently miss critical events — collaboration updates, health alerts, execution status changes — with no way to recover missed messages on reconnect.

Solution

Replace the in-process broadcast with Redis Streams as the event bus. Redis is already running. Streams provide:

  • Persistent event log with configurable retention (e.g., last 1000 events or 1 hour)
  • Replay on reconnect via XRANGE — clients that disconnect and reconnect get missed events
  • Consumer groups for at-least-once delivery semantics
  • Built-in backpressure via BLOCK parameter on XREAD

Scope

  • Replace ConnectionManager.broadcast() with Redis Stream publish (XADD)
  • Each WebSocket connection reads from Redis Stream (XREAD BLOCK)
  • Track per-client last-seen event ID; on reconnect, replay from that point
  • Add stream trimming (MAXLEN ~10000 or time-based) to prevent unbounded growth
  • Replace FilteredWebSocketManager broadcast with stream consumer that filters by user permissions
  • Remove except: pass pattern — errors now tracked per-client with eviction after N failures

Acceptance Criteria

  • All real-time events (agent_activity, agent_collaboration, process events, health) flow through Redis Streams
  • WebSocket clients that disconnect and reconnect receive missed events (up to retention window)
  • Failed sends to a client are logged and client is evicted after 3 consecutive failures
  • Stream auto-trims to prevent unbounded Redis memory growth
  • Existing WebSocket event format unchanged (backward compatible for frontend)
  • ~100 lines to replace ConnectionManager, zero new infrastructure

Key Files

  • src/backend/main.py — replace ConnectionManager and FilteredWebSocketManager
  • src/backend/services/activity_service.py — publish to stream instead of direct broadcast
  • src/frontend/src/utils/websocket.js — send last-event-ID on reconnect

Dependencies

None — Redis already running.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions