Skip to content

Refactor dashboard PubSub consumers to Stream.fromQueue pipelines#95

Open
danielbdyer wants to merge 1 commit into
mainfrom
codex/refactor-to-use-stream.fromqueue-pipelines
Open

Refactor dashboard PubSub consumers to Stream.fromQueue pipelines#95
danielbdyer wants to merge 1 commit into
mainfrom
codex/refactor-to-use-stream.fromqueue-pipelines

Conversation

@danielbdyer
Copy link
Copy Markdown
Owner

Motivation

  • Replace fragile manual Queue.take loops with Stream.fromQueue pipelines to centralize consumer behaviour and reduce imperative loop code.
  • Provide a reusable consumer abstraction that supports event-kind filtering, batching/time-window semantics, and explicit error hooks so multiple subscribers share consistent flush/ordering guarantees.
  • Preserve the Effect PubSub as the canonical event source while making downstream consumers (SAB writer, WS broadcaster, journal writer) composable via streams.

Description

  • Added a reusable stream consumer module lib/infrastructure/dashboard/pubsub-stream-consumer.ts exposing subscribePubSubStreamConsumer with options for eventKinds, batching (groupedWithin), onEvent/onBatch sinks, and onError handling.
  • Replaced manual Queue.take intake loops in lib/infrastructure/dashboard/pipeline-event-bus.ts with the new consumer for the SharedArrayBuffer + string channel writer and for the WS broadcaster.
  • Refactored lib/infrastructure/dashboard/journal-writer.ts to consume events via the shared stream consumer using batched groupedWithin semantics and to perform batched enrich/append/index updates while preserving the original spaced flush cadence.
  • Added law-style tests tests/dashboard-stream-consumer.laws.spec.ts that validate no event loss and FIFO ordering under burst load and that batching preserves spaced flush semantics.

Testing

  • Ran the targeted test suite with npm run test -- tests/dashboard-stream-consumer.laws.spec.ts tests/pubsub-backpressure.laws.spec.ts tests/flywheel-server-integration.laws.spec.ts and all tests passed (22 passed).
  • The new dashboard-stream-consumer tests exercised burst-load ordering and groupedWithin-based batching behavior and succeeded.
  • Type checking via npm run typecheck surfaced unrelated missing generated module declarations (../generated/tesseract-knowledge / ../lib/generated/tesseract-knowledge) and failed; this is pre-existing and not caused by these changes.

Codex Task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant