Skip to content

feat(bus): migrate Bus to Effect service with PubSub#18173

Open
kitlangton wants to merge 7 commits intodevfrom
kit/effect-bus
Open

feat(bus): migrate Bus to Effect service with PubSub#18173
kitlangton wants to merge 7 commits intodevfrom
kit/effect-bus

Conversation

@kitlangton
Copy link
Contributor

@kitlangton kitlangton commented Mar 19, 2026

Summary

Migrate the Bus service to Effect with PubSub internals, and make a deliberate semantic change: publish is now fire-and-forget — it no longer blocks on subscriber callbacks completing.

Bus Effect Service

  • Add Bus.Service as a ServiceMap.Service backed by Effect PubSub
  • publish(def, props)Effect<void> — pushes to per-type + wildcard PubSubs and emits to GlobalBus
  • subscribe(def)Stream<Payload<D>> — returns a typed stream via Stream.fromPubSub
  • subscribeAll()Stream<Payload> — returns a wildcard stream
  • Legacy adapters wrap the Effect service: publish via runPromiseInstance, subscribe/subscribeAll via runCallbackInstance (returns interruptor as unsubscribe)
  • Add runCallbackInstance helper to effect/runtime for long-lived stream subscriptions

Type tightening

  • GlobalBus payload: any{ type: string; properties: Record<string, unknown> }
  • BusEvent.define: constrain Properties to ZodObject instead of ZodType, so TS knows event properties are always a record
  • Refactor watcher test to use Bus.subscribe instead of raw GlobalBus listener

Behavioral changes

Publish is fire-and-forget. The old Bus awaited Promise.all of subscriber callbacks before returning. With PubSub, publish enqueues messages and returns immediately. This is intentional — a pub/sub system that blocks on all subscribers is a hidden control-flow dependency, not a bus. One slow listener shouldn't turn the bus into a synchronous pipeline.

We audited every await Bus.publish(...) + subscriber pair in the codebase. The only case where the old blocking behavior was load-bearing was formatting: the write/edit/apply_patch tools published File.Event.Edited, and the formatter subscribed with an async callback that spawned an external process. The old bus happened to await the formatter finishing before the tool continued to stamp FileTime and run LSP diagnostics.

Fix: make formatting explicit. Rather than relying on the bus to implicitly sequence formatting, we now call Format.run(filepath) directly in the write tools, right after Filesystem.write() and before FileTime.read() / LSP diagnostics. This is clearer, more predictable, and means the File.Event.Edited event now reflects the final (post-format) file state. The Bus subscription in the Format layer is removed entirely — File.Event.Edited now has zero local subscribers (it still goes to GlobalBus/SSE for UI consumers).

All other subscriber pairs are either informational (VCS branch detection, share sync, plugin hooks) or have zero subscribers. None depend on publish blocking.

Cleanup

  • Remove unused Bus.once (zero callers in codebase)
  • Remove Instance.state-based subscription management (replaced by PubSub)
  • InstanceDisposed is already emitted to GlobalBus by instance.ts — the Bus no longer duplicates this

Test plan

  • Comprehensive bus test suite (15 tests): publish/subscribe, multiple subscribers, unsubscribe, subscribeAll, GlobalBus forwarding, instance isolation, disposal, async subscribers
  • Format tests pass (4 tests)
  • All existing tests pass (1229 pass, 2 flaky SQLite I/O failures unrelated to changes)
  • Typecheck passes across all packages

Constrain BusEvent.define to ZodObject instead of ZodType so TS knows
event properties are always a record. Type GlobalBus payload as
{ type: string; properties: Record<string, unknown> } instead of any.

Refactor watcher test to use Bus.subscribe instead of raw GlobalBus
listener, removing hand-rolled event types and unnecessary casts.
Covers publish/subscribe, multiple subscribers, unsubscribe, subscribeAll,
once, GlobalBus forwarding, instance isolation, disposal, and async subscribers.
Add Bus.Service as a ServiceMap.Service backed by Effect PubSub:
- publish() pushes to per-type + wildcard PubSubs and GlobalBus
- subscribe() returns a typed Stream via Stream.fromPubSub
- subscribeAll() returns a wildcard Stream

Legacy adapters wrap the Effect service:
- publish → runPromiseInstance
- subscribe/subscribeAll → runCallbackInstance with Stream.runForEach

Other changes:
- Register Bus.Service in Instances LayerMap
- Add runCallbackInstance helper to effect/runtime
- Remove unused Bus.once (zero callers)
- Skip PubSub creation on publish when no subscribers exist
- Move subscribe/unsubscribe logging into the Effect service layer
Replace the implicit Bus.subscribe(File.Event.Edited) formatter with
an explicit Format.run(filepath) call in write/edit/apply_patch tools.

This ensures formatting completes before FileTime stamps and LSP
diagnostics run, rather than relying on the bus to block on subscribers.

- Add Format.run() to the Effect service interface and legacy adapter
- Call Format.run() in write, edit, and apply_patch tools after writes
- Remove Bus subscription from Format layer
Use forkInstance + Fiber.interrupt (which awaits) instead of
runCallbackInstance + interruptUnsafe (fire-and-forget) for
subscribeAll. This ensures the fiber completes before layer
invalidation, allowing the RcMap refCount to drop to 0.

subscribeAll now delivers InstanceDisposed as the last callback
message via Effect.ensuring when the fiber is interrupted during
disposal, but not on manual unsubscribe.

Add priority support to registerDisposer so Bus can interrupt
subscription fibers (priority -1) before layer invalidation
(priority 0).

Add forkInstance helper to effect/runtime that returns a Fiber
instead of an interrupt function.
The sync callback API can't wait for async layer acquisition, so
delivering InstanceDisposed through the PubSub stream is a race
condition. Instead, the legacy subscribeAll adapter listens on
GlobalBus for InstanceDisposed matching the current directory.

The Effect service's stream ending IS the disposal signal for
Effect consumers — this is only needed for the legacy callback API.

Also reverts forceInvalidate, fiber tracking, priority-based
disposal, and other workaround attempts. Clean simple solution.
…ffect tests

Legacy subscribeAll delivers InstanceDisposed via GlobalBus because
the fiber starts asynchronously and may not be running when disposal
happens. This bridge can be removed once upstream PubSub.shutdown
properly wakes suspended subscribers.

Add forceInvalidate in Instances that closes the RcMap entry scope
regardless of refCount. Standard RcMap.invalidate bails when
refCount > 0 — an upstream issue (Effect-TS/effect-smol#1799).

Add PubSub shutdown finalizer to Bus layer so layer teardown
properly cleans up PubSubs.

Add Effect-native tests proving forkScoped + scope closure works
correctly: ensuring fires when the scope closes, streams receive
published events.

Remove stale GlobalBus disposal test (instance.ts responsibility).
@kitlangton
Copy link
Contributor Author

Upstream Effect PRs

This migration uncovered three issues in effect-smol that affect LayerMap-based service lifecycle management:

  1. RcMap.invalidate bails on refCount > 0fix(RcMap): invalidate should close scope regardless of refCount Effect-TS/effect-smol#1799
  2. PubSub.shutdown doesn't interrupt suspended subscribersFix PubSub shutdown to interrupt suspended subscribers Effect-TS/effect-smol#1800
  3. PubSub.publish returns false on shutdown instead of interruptingPubSub.publish returns false on shutdown instead of interrupting Effect-TS/effect-smol#1802
  4. Stream.fromPubSub doesn't propagate interrupts on PubSub shutdownStream.fromPubSub propagates interrupts on PubSub shutdown Effect-TS/effect-smol#1804

Once these land, the GlobalBus bridge in subscribeAll can be removed — PubSub shutdown will naturally end streams and fire ensuring callbacks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant