feat(bus): migrate Bus to Effect service with PubSub#18173
Open
kitlangton wants to merge 7 commits intodevfrom
Open
feat(bus): migrate Bus to Effect service with PubSub#18173kitlangton wants to merge 7 commits intodevfrom
kitlangton wants to merge 7 commits intodevfrom
Conversation
Constrain BusEvent.define to ZodObject instead of ZodType so TS knows
event properties are always a record. Type GlobalBus payload as
{ type: string; properties: Record<string, unknown> } instead of any.
Refactor watcher test to use Bus.subscribe instead of raw GlobalBus
listener, removing hand-rolled event types and unnecessary casts.
Covers publish/subscribe, multiple subscribers, unsubscribe, subscribeAll, once, GlobalBus forwarding, instance isolation, disposal, and async subscribers.
Add Bus.Service as a ServiceMap.Service backed by Effect PubSub: - publish() pushes to per-type + wildcard PubSubs and GlobalBus - subscribe() returns a typed Stream via Stream.fromPubSub - subscribeAll() returns a wildcard Stream Legacy adapters wrap the Effect service: - publish → runPromiseInstance - subscribe/subscribeAll → runCallbackInstance with Stream.runForEach Other changes: - Register Bus.Service in Instances LayerMap - Add runCallbackInstance helper to effect/runtime - Remove unused Bus.once (zero callers) - Skip PubSub creation on publish when no subscribers exist - Move subscribe/unsubscribe logging into the Effect service layer
Replace the implicit Bus.subscribe(File.Event.Edited) formatter with an explicit Format.run(filepath) call in write/edit/apply_patch tools. This ensures formatting completes before FileTime stamps and LSP diagnostics run, rather than relying on the bus to block on subscribers. - Add Format.run() to the Effect service interface and legacy adapter - Call Format.run() in write, edit, and apply_patch tools after writes - Remove Bus subscription from Format layer
Use forkInstance + Fiber.interrupt (which awaits) instead of runCallbackInstance + interruptUnsafe (fire-and-forget) for subscribeAll. This ensures the fiber completes before layer invalidation, allowing the RcMap refCount to drop to 0. subscribeAll now delivers InstanceDisposed as the last callback message via Effect.ensuring when the fiber is interrupted during disposal, but not on manual unsubscribe. Add priority support to registerDisposer so Bus can interrupt subscription fibers (priority -1) before layer invalidation (priority 0). Add forkInstance helper to effect/runtime that returns a Fiber instead of an interrupt function.
The sync callback API can't wait for async layer acquisition, so delivering InstanceDisposed through the PubSub stream is a race condition. Instead, the legacy subscribeAll adapter listens on GlobalBus for InstanceDisposed matching the current directory. The Effect service's stream ending IS the disposal signal for Effect consumers — this is only needed for the legacy callback API. Also reverts forceInvalidate, fiber tracking, priority-based disposal, and other workaround attempts. Clean simple solution.
…ffect tests Legacy subscribeAll delivers InstanceDisposed via GlobalBus because the fiber starts asynchronously and may not be running when disposal happens. This bridge can be removed once upstream PubSub.shutdown properly wakes suspended subscribers. Add forceInvalidate in Instances that closes the RcMap entry scope regardless of refCount. Standard RcMap.invalidate bails when refCount > 0 — an upstream issue (Effect-TS/effect-smol#1799). Add PubSub shutdown finalizer to Bus layer so layer teardown properly cleans up PubSubs. Add Effect-native tests proving forkScoped + scope closure works correctly: ensuring fires when the scope closes, streams receive published events. Remove stale GlobalBus disposal test (instance.ts responsibility).
Contributor
Author
Upstream Effect PRsThis migration uncovered three issues in effect-smol that affect LayerMap-based service lifecycle management:
Once these land, the GlobalBus bridge in |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Migrate the Bus service to Effect with PubSub internals, and make a deliberate semantic change: publish is now fire-and-forget — it no longer blocks on subscriber callbacks completing.
Bus Effect Service
Bus.Serviceas aServiceMap.Servicebacked by EffectPubSubpublish(def, props)→Effect<void>— pushes to per-type + wildcard PubSubs and emits to GlobalBussubscribe(def)→Stream<Payload<D>>— returns a typed stream viaStream.fromPubSubsubscribeAll()→Stream<Payload>— returns a wildcard streampublishviarunPromiseInstance,subscribe/subscribeAllviarunCallbackInstance(returns interruptor as unsubscribe)runCallbackInstancehelper toeffect/runtimefor long-lived stream subscriptionsType tightening
GlobalBuspayload:any→{ type: string; properties: Record<string, unknown> }BusEvent.define: constrainPropertiestoZodObjectinstead ofZodType, so TS knows event properties are always a recordBus.subscribeinstead of rawGlobalBuslistenerBehavioral changes
Publish is fire-and-forget. The old Bus awaited
Promise.allof subscriber callbacks before returning. With PubSub, publish enqueues messages and returns immediately. This is intentional — a pub/sub system that blocks on all subscribers is a hidden control-flow dependency, not a bus. One slow listener shouldn't turn the bus into a synchronous pipeline.We audited every
await Bus.publish(...)+ subscriber pair in the codebase. The only case where the old blocking behavior was load-bearing was formatting: the write/edit/apply_patch tools publishedFile.Event.Edited, and the formatter subscribed with an async callback that spawned an external process. The old bus happened to await the formatter finishing before the tool continued to stampFileTimeand run LSP diagnostics.Fix: make formatting explicit. Rather than relying on the bus to implicitly sequence formatting, we now call
Format.run(filepath)directly in the write tools, right afterFilesystem.write()and beforeFileTime.read()/ LSP diagnostics. This is clearer, more predictable, and means theFile.Event.Editedevent now reflects the final (post-format) file state. The Bus subscription in the Format layer is removed entirely —File.Event.Editednow has zero local subscribers (it still goes to GlobalBus/SSE for UI consumers).All other subscriber pairs are either informational (VCS branch detection, share sync, plugin hooks) or have zero subscribers. None depend on publish blocking.
Cleanup
Bus.once(zero callers in codebase)Instance.state-based subscription management (replaced by PubSub)InstanceDisposedis already emitted to GlobalBus byinstance.ts— the Bus no longer duplicates thisTest plan