From f63a2a2636f8e25651d28af249ed0e50c062f258 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 20:57:08 -0400 Subject: [PATCH 1/9] fix(bus): tighten GlobalBus payload and BusEvent.define types Constrain BusEvent.define to ZodObject instead of ZodType so TS knows event properties are always a record. Type GlobalBus payload as { type: string; properties: Record } instead of any. Refactor watcher test to use Bus.subscribe instead of raw GlobalBus listener, removing hand-rolled event types and unnecessary casts. --- packages/opencode/src/bus/bus-event.ts | 4 ++-- packages/opencode/src/bus/global.ts | 2 +- .../opencode/src/control-plane/workspace.ts | 2 +- packages/opencode/test/file/watcher.test.ts | 20 +++++++------------ 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index 7fe13833c86..1d9a31d4a27 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -1,5 +1,5 @@ import z from "zod" -import type { ZodType } from "zod" +import type { ZodObject, ZodRawShape } from "zod" import { Log } from "../util/log" export namespace BusEvent { @@ -9,7 +9,7 @@ export namespace BusEvent { const registry = new Map() - export function define(type: Type, properties: Properties) { + export function define>(type: Type, properties: Properties) { const result = { type, properties, diff --git a/packages/opencode/src/bus/global.ts b/packages/opencode/src/bus/global.ts index 43386dd6b20..dcc7664007e 100644 --- a/packages/opencode/src/bus/global.ts +++ b/packages/opencode/src/bus/global.ts @@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{ event: [ { directory?: string - payload: any + payload: { type: string; properties: Record } }, ] }>() diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index c3c28ed6057..d1f884f35a5 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -123,7 +123,7 @@ export namespace Workspace { await parseSSE(res.body, stop, (event) => { GlobalBus.emit("event", { directory: space.id, - payload: event, + payload: event as { type: string; properties: Record }, }) }) // Wait 250ms and retry if SSE connection fails diff --git a/packages/opencode/test/file/watcher.test.ts b/packages/opencode/test/file/watcher.test.ts index 2cd27643e88..6d4c5f402fd 100644 --- a/packages/opencode/test/file/watcher.test.ts +++ b/packages/opencode/test/file/watcher.test.ts @@ -5,9 +5,9 @@ import path from "path" import { Deferred, Effect, Option } from "effect" import { tmpdir } from "../fixture/fixture" import { watcherConfigLayer, withServices } from "../fixture/instance" +import { Bus } from "../../src/bus" import { FileWatcher } from "../../src/file/watcher" import { Instance } from "../../src/project/instance" -import { GlobalBus } from "../../src/bus/global" // Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows) const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip @@ -16,7 +16,6 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc // Helpers // --------------------------------------------------------------------------- -type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } } type WatcherEvent = { file: string; event: "add" | "change" | "unlink" } /** Run `body` with a live FileWatcher service. */ @@ -36,22 +35,17 @@ function withWatcher(directory: string, body: Effect.Effect) { function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) { let done = false - function on(evt: BusUpdate) { + const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => { if (done) return - if (evt.directory !== directory) return - if (evt.payload.type !== FileWatcher.Event.Updated.type) return - if (!check(evt.payload.properties)) return - hit(evt.payload.properties) - } + if (!check(evt.properties)) return + hit(evt.properties) + }) - function cleanup() { + return () => { if (done) return done = true - GlobalBus.off("event", on) + unsub() } - - GlobalBus.on("event", on) - return cleanup } function wait(directory: string, check: (evt: WatcherEvent) => boolean) { From 645c15351bbecd8ce72bf3f0719c8fd4d1c47631 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 21:05:36 -0400 Subject: [PATCH 2/9] test(bus): add comprehensive test suite for Bus service Covers publish/subscribe, multiple subscribers, unsubscribe, subscribeAll, once, GlobalBus forwarding, instance isolation, disposal, and async subscribers. --- packages/opencode/test/bus/bus.test.ts | 320 +++++++++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 packages/opencode/test/bus/bus.test.ts diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts new file mode 100644 index 00000000000..64a21e79925 --- /dev/null +++ b/packages/opencode/test/bus/bus.test.ts @@ -0,0 +1,320 @@ +import { afterEach, describe, expect, test } from "bun:test" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { GlobalBus } from "../../src/bus/global" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +// --------------------------------------------------------------------------- +// Test event definitions +// --------------------------------------------------------------------------- + +const TestEvent = { + Ping: BusEvent.define("test.ping", z.object({ value: z.number() })), + Pong: BusEvent.define("test.pong", z.object({ message: z.string() })), +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function withInstance(directory: string, fn: () => Promise) { + return Instance.provide({ directory, fn }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("Bus", () => { + afterEach(() => Instance.disposeAll()) + + describe("publish + subscribe", () => { + test("subscriber receives matching events", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 42 }) + await Bus.publish(TestEvent.Ping, { value: 99 }) + }) + + expect(received).toEqual([42, 99]) + }) + + test("subscriber does not receive events of other types", async () => { + await using tmp = await tmpdir() + const pings: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + pings.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Pong, { message: "hello" }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + expect(pings).toEqual([1]) + }) + + test("publish with no subscribers does not throw", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + }) + }) + + describe("multiple subscribers", () => { + test("all subscribers for same event type are called", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value)) + Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value)) + await Bus.publish(TestEvent.Ping, { value: 7 }) + }) + + expect(a).toEqual([7]) + expect(b).toEqual([7]) + }) + + test("subscribers are called in registration order", async () => { + await using tmp = await tmpdir() + const order: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, () => order.push("first")) + Bus.subscribe(TestEvent.Ping, () => order.push("second")) + Bus.subscribe(TestEvent.Ping, () => order.push("third")) + await Bus.publish(TestEvent.Ping, { value: 0 }) + }) + + expect(order).toEqual(["first", "second", "third"]) + }) + }) + + describe("unsubscribe", () => { + test("unsubscribe stops delivery", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsub() + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(received).toEqual([1]) + }) + + test("unsubscribe is idempotent", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, () => {}) + unsub() + unsub() // should not throw + }) + }) + + test("unsubscribing one does not affect others", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value)) + Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value)) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsubA() + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(a).toEqual([1]) + expect(b).toEqual([1, 2]) + }) + }) + + describe("subscribeAll", () => { + test("receives events of all types", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Pong, { message: "hi" }) + }) + + expect(all).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll + typed subscribe both fire", async () => { + await using tmp = await tmpdir() + const typed: number[] = [] + const wild: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value)) + Bus.subscribeAll((evt) => wild.push(evt.type)) + await Bus.publish(TestEvent.Ping, { value: 5 }) + }) + + expect(typed).toEqual([5]) + expect(wild).toEqual(["test.ping"]) + }) + + test("unsubscribe from subscribeAll", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribeAll((evt) => all.push(evt.type)) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsub() + await Bus.publish(TestEvent.Pong, { message: "missed" }) + }) + + expect(all).toEqual(["test.ping"]) + }) + }) + + describe("once", () => { + test("fires once when callback returns 'done'", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.once(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + return "done" + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(received).toEqual([1]) + }) + + test("keeps listening when callback returns undefined", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.once(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + if (evt.properties.value === 3) return "done" + return undefined + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Ping, { value: 2 }) + await Bus.publish(TestEvent.Ping, { value: 3 }) + await Bus.publish(TestEvent.Ping, { value: 4 }) + }) + + expect(received).toEqual([1, 2, 3]) + }) + }) + + describe("GlobalBus forwarding", () => { + test("publish emits to GlobalBus with directory", async () => { + await using tmp = await tmpdir() + const globalEvents: Array<{ directory?: string; payload: any }> = [] + + const handler = (evt: any) => globalEvents.push(evt) + GlobalBus.on("event", handler) + + try { + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 42 }) + }) + + const ping = globalEvents.find((e) => e.payload.type === "test.ping") + expect(ping).toBeDefined() + expect(ping!.directory).toBe(tmp.path) + expect(ping!.payload).toEqual({ + type: "test.ping", + properties: { value: 42 }, + }) + } finally { + GlobalBus.off("event", handler) + } + }) + }) + + describe("instance isolation", () => { + test("subscribers in one instance do not receive events from another", async () => { + await using tmpA = await tmpdir() + await using tmpB = await tmpdir() + const eventsA: number[] = [] + const eventsB: number[] = [] + + await withInstance(tmpA.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value)) + }) + + await withInstance(tmpB.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value)) + }) + + await withInstance(tmpA.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + await withInstance(tmpB.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(eventsA).toEqual([1]) + expect(eventsB).toEqual([2]) + }) + }) + + describe("instance disposal", () => { + test("wildcard subscribers receive InstanceDisposed on disposal", async () => { + await using tmp = await tmpdir() + const events: Array<{ type: string }> = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => events.push({ type: evt.type })) + }) + + await Instance.disposeAll() + + const disposed = events.find((e) => e.type === "server.instance.disposed") + expect(disposed).toBeDefined() + }) + }) + + describe("async subscribers", () => { + test("publish awaits async subscriber promises", async () => { + await using tmp = await tmpdir() + const order: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, async () => { + await new Promise((r) => setTimeout(r, 10)) + order.push("async-done") + }) + + await Bus.publish(TestEvent.Ping, { value: 1 }) + order.push("after-publish") + }) + + expect(order).toEqual(["async-done", "after-publish"]) + }) + }) +}) From f3cf519d983854f3d77d3a7f123e84e9a7a64d15 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 21:36:04 -0400 Subject: [PATCH 3/9] feat(bus): migrate Bus to Effect service with PubSub internals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add Bus.Service as a ServiceMap.Service backed by Effect PubSub: - publish() pushes to per-type + wildcard PubSubs and GlobalBus - subscribe() returns a typed Stream via Stream.fromPubSub - subscribeAll() returns a wildcard Stream Legacy adapters wrap the Effect service: - publish → runPromiseInstance - subscribe/subscribeAll → runCallbackInstance with Stream.runForEach Other changes: - Register Bus.Service in Instances LayerMap - Add runCallbackInstance helper to effect/runtime - Remove unused Bus.once (zero callers) - Skip PubSub creation on publish when no subscribers exist - Move subscribe/unsubscribe logging into the Effect service layer --- packages/opencode/src/bus/index.ts | 161 ++++++++++++---------- packages/opencode/src/effect/instances.ts | 3 + packages/opencode/src/effect/runtime.ts | 6 + packages/opencode/test/bus/bus.test.ts | 75 ++++------ 4 files changed, 121 insertions(+), 124 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..5a1c106f4c5 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,12 +1,13 @@ import z from "zod" +import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" +import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) - type Subscription = (event: any) => void export const InstanceDisposed = BusEvent.define( "server.instance.disposed", @@ -15,91 +16,105 @@ export namespace Bus { }), ) - const state = Instance.state( - () => { - const subscriptions = new Map() + // --------------------------------------------------------------------------- + // Service definition + // --------------------------------------------------------------------------- - return { - subscriptions, + type Payload = { + type: D["type"] + properties: z.infer + } + + export interface Interface { + readonly publish: ( + def: D, + properties: z.output, + ) => Effect.Effect + readonly subscribe: (def: D) => Stream.Stream> + readonly subscribeAll: () => Stream.Stream + } + + export class Service extends ServiceMap.Service()("@opencode/Bus") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const pubsubs = new Map>() + const wildcardPubSub = yield* PubSub.unbounded() + + const getOrCreate = Effect.fnUntraced(function* (type: string) { + let ps = pubsubs.get(type) + if (!ps) { + ps = yield* PubSub.unbounded() + pubsubs.set(type, ps) + } + return ps + }) + + function publish( + def: D, + properties: z.output, + ) { + return Effect.gen(function* () { + const payload: Payload = { type: def.type, properties } + log.info("publishing", { type: def.type }) + + const ps = pubsubs.get(def.type) + if (ps) yield* PubSub.publish(ps, payload) + yield* PubSub.publish(wildcardPubSub, payload) + + GlobalBus.emit("event", { + directory: Instance.directory, + payload, + }) + }) } - }, - async (entry) => { - const wildcard = entry.subscriptions.get("*") - if (!wildcard) return - const event = { - type: InstanceDisposed.type, - properties: { - directory: Instance.directory, - }, + + function subscribe(def: D): Stream.Stream> { + log.info("subscribing", { type: def.type }) + return Stream.unwrap( + Effect.gen(function* () { + const ps = yield* getOrCreate(def.type) + return Stream.fromPubSub(ps) as Stream.Stream> + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) } - for (const sub of [...wildcard]) { - sub(event) + + function subscribeAll(): Stream.Stream { + log.info("subscribing", { type: "*" }) + return Stream.fromPubSub(wildcardPubSub).pipe( + Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))), + ) } - }, + + return Service.of({ publish, subscribe, subscribeAll }) + }), ) - export async function publish( - def: Definition, - properties: z.output, - ) { - const payload = { - type: def.type, - properties, - } - log.info("publishing", { - type: def.type, - }) - const pending = [] - for (const key of [def.type, "*"]) { - const match = state().subscriptions.get(key) - for (const sub of match ?? []) { - pending.push(sub(payload)) - } - } - GlobalBus.emit("event", { - directory: Instance.directory, - payload, - }) - return Promise.all(pending) + // --------------------------------------------------------------------------- + // Legacy adapters — plain function API wrapping the Effect service + // --------------------------------------------------------------------------- + + function runStream(stream: (svc: Interface) => Stream.Stream, callback: (event: any) => void) { + return runCallbackInstance( + Service.use((svc) => + stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), + ), + ) } - export function subscribe( - def: Definition, - callback: (event: { type: Definition["type"]; properties: z.infer }) => void, - ) { - return raw(def.type, callback) + export function publish(def: D, properties: z.output) { + return runPromiseInstance(Service.use((svc) => svc.publish(def, properties))) } - export function once( - def: Definition, - callback: (event: { - type: Definition["type"] - properties: z.infer - }) => "done" | undefined, + export function subscribe( + def: D, + callback: (event: Payload) => void, ) { - const unsub = subscribe(def, (event) => { - if (callback(event)) unsub() - }) + return runStream((svc) => svc.subscribe(def), callback) } export function subscribeAll(callback: (event: any) => void) { - return raw("*", callback) - } - - function raw(type: string, callback: (event: any) => void) { - log.info("subscribing", { type }) - const subscriptions = state().subscriptions - let match = subscriptions.get(type) ?? [] - match.push(callback) - subscriptions.set(type, match) - - return () => { - log.info("unsubscribing", { type }) - const match = subscriptions.get(type) - if (!match) return - const index = match.indexOf(callback) - if (index === -1) return - match.splice(index, 1) - } + return runStream((svc) => svc.subscribeAll(), callback) } } diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index c05458d5df9..fa3ec528f4a 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -1,4 +1,5 @@ import { Effect, Layer, LayerMap, ServiceMap } from "effect" +import { Bus } from "@/bus" import { File } from "@/file" import { FileTime } from "@/file/time" import { FileWatcher } from "@/file/watcher" @@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry" export { InstanceContext } from "./instance-context" export type InstanceServices = + | Bus.Service | Question.Service | PermissionNext.Service | ProviderAuth.Service @@ -36,6 +38,7 @@ export type InstanceServices = function lookup(_key: string) { const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current)) return Layer.mergeAll( + Layer.fresh(Bus.layer), Layer.fresh(Question.layer), Layer.fresh(PermissionNext.layer), Layer.fresh(ProviderAuth.defaultLayer), diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index f52203b2220..4465b106f47 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -18,6 +18,12 @@ export function runPromiseInstance(effect: Effect.Effect( + effect: Effect.Effect, +): (interruptor?: number) => void { + return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function disposeRuntime() { return runtime.dispose() } diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index 64a21e79925..b3da0fc9eaa 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -192,43 +192,6 @@ describe("Bus", () => { }) }) - describe("once", () => { - test("fires once when callback returns 'done'", async () => { - await using tmp = await tmpdir() - const received: number[] = [] - - await withInstance(tmp.path, async () => { - Bus.once(TestEvent.Ping, (evt) => { - received.push(evt.properties.value) - return "done" - }) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bus.publish(TestEvent.Ping, { value: 2 }) - }) - - expect(received).toEqual([1]) - }) - - test("keeps listening when callback returns undefined", async () => { - await using tmp = await tmpdir() - const received: number[] = [] - - await withInstance(tmp.path, async () => { - Bus.once(TestEvent.Ping, (evt) => { - received.push(evt.properties.value) - if (evt.properties.value === 3) return "done" - return undefined - }) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bus.publish(TestEvent.Ping, { value: 2 }) - await Bus.publish(TestEvent.Ping, { value: 3 }) - await Bus.publish(TestEvent.Ping, { value: 4 }) - }) - - expect(received).toEqual([1, 2, 3]) - }) - }) - describe("GlobalBus forwarding", () => { test("publish emits to GlobalBus with directory", async () => { await using tmp = await tmpdir() @@ -284,37 +247,47 @@ describe("Bus", () => { }) describe("instance disposal", () => { - test("wildcard subscribers receive InstanceDisposed on disposal", async () => { + test("InstanceDisposed is emitted to GlobalBus on disposal", async () => { await using tmp = await tmpdir() - const events: Array<{ type: string }> = [] + const globalEvents: Array<{ directory?: string; payload: any }> = [] - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => events.push({ type: evt.type })) - }) + const handler = (evt: any) => globalEvents.push(evt) + GlobalBus.on("event", handler) - await Instance.disposeAll() + try { + await withInstance(tmp.path, async () => { + // Instance is active — subscribe so the layer gets created + Bus.subscribe(TestEvent.Ping, () => {}) + }) + + await Instance.disposeAll() - const disposed = events.find((e) => e.type === "server.instance.disposed") - expect(disposed).toBeDefined() + const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed") + expect(disposed).toBeDefined() + expect(disposed!.payload.properties.directory).toBe(tmp.path) + } finally { + GlobalBus.off("event", handler) + } }) }) describe("async subscribers", () => { - test("publish awaits async subscriber promises", async () => { + test("publish is fire-and-forget (does not await subscriber callbacks)", async () => { await using tmp = await tmpdir() - const order: string[] = [] + const received: number[] = [] await withInstance(tmp.path, async () => { - Bus.subscribe(TestEvent.Ping, async () => { + Bus.subscribe(TestEvent.Ping, async (evt) => { await new Promise((r) => setTimeout(r, 10)) - order.push("async-done") + received.push(evt.properties.value) }) await Bus.publish(TestEvent.Ping, { value: 1 }) - order.push("after-publish") + // Give the async subscriber time to complete + await new Promise((r) => setTimeout(r, 50)) }) - expect(order).toEqual(["async-done", "after-publish"]) + expect(received).toEqual([1]) }) }) }) From 009d77c9d81e5188187bb9bc30fbc9111adba81e Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 22:09:14 -0400 Subject: [PATCH 4/9] refactor(format): make formatting explicit instead of bus-driven Replace the implicit Bus.subscribe(File.Event.Edited) formatter with an explicit Format.run(filepath) call in write/edit/apply_patch tools. This ensures formatting completes before FileTime stamps and LSP diagnostics run, rather than relying on the bus to block on subscribers. - Add Format.run() to the Effect service interface and legacy adapter - Call Format.run() in write, edit, and apply_patch tools after writes - Remove Bus subscription from Format layer --- packages/opencode/src/format/index.ts | 87 +++++++++++------------ packages/opencode/src/tool/apply_patch.ts | 2 + packages/opencode/src/tool/edit.ts | 3 + packages/opencode/src/tool/write.ts | 2 + 4 files changed, 50 insertions(+), 44 deletions(-) diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 6da8caa08c8..8be6ceab6d1 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -4,9 +4,7 @@ import { InstanceContext } from "@/effect/instance-context" import path from "path" import { mergeDeep } from "remeda" import z from "zod" -import { Bus } from "../bus" import { Config } from "../config/config" -import { File } from "../file" import { Instance } from "../project/instance" import { Process } from "../util/process" import { Log } from "../util/log" @@ -27,6 +25,7 @@ export namespace Format { export type Status = z.infer export interface Interface { + readonly run: (filepath: string) => Effect.Effect readonly status: () => Effect.Effect } @@ -90,48 +89,44 @@ export namespace Format { return result } - yield* Effect.acquireRelease( - Effect.sync(() => - Bus.subscribe( - File.Event.Edited, - Instance.bind(async (payload) => { - const file = payload.properties.file - log.info("formatting", { file }) - const ext = path.extname(file) - - for (const item of await getFormatter(ext)) { - log.info("running", { command: item.command }) - try { - const proc = Process.spawn( - item.command.map((x) => x.replace("$FILE", file)), - { - cwd: instance.directory, - env: { ...process.env, ...item.environment }, - stdout: "ignore", - stderr: "ignore", - }, - ) - const exit = await proc.exited - if (exit !== 0) { - log.error("failed", { - command: item.command, - ...item.environment, - }) - } - } catch (error) { - log.error("failed to format file", { - error, - command: item.command, - ...item.environment, - file, - }) - } + const run = Effect.fn("Format.run")(function* (filepath: string) { + log.info("formatting", { file: filepath }) + const ext = path.extname(filepath) + + for (const item of yield* Effect.promise(() => getFormatter(ext))) { + log.info("running", { command: item.command }) + yield* Effect.tryPromise({ + try: async () => { + const proc = Process.spawn( + item.command.map((x) => x.replace("$FILE", filepath)), + { + cwd: instance.directory, + env: { ...process.env, ...item.environment }, + stdout: "ignore", + stderr: "ignore", + }, + ) + const exit = await proc.exited + if (exit !== 0) { + log.error("failed", { + command: item.command, + ...item.environment, + }) } - }), - ), - ), - (unsubscribe) => Effect.sync(unsubscribe), - ) + }, + catch: (error) => { + log.error("failed to format file", { + error, + command: item.command, + ...item.environment, + file: filepath, + }) + return error + }, + }).pipe(Effect.ignore) + } + }) + log.info("init") const status = Effect.fn("Format.status")(function* () { @@ -147,10 +142,14 @@ export namespace Format { return result }) - return Service.of({ status }) + return Service.of({ run, status }) }), ) + export async function run(filepath: string) { + return runPromiseInstance(Service.use((s) => s.run(filepath))) + } + export async function status() { return runPromiseInstance(Service.use((s) => s.status())) } diff --git a/packages/opencode/src/tool/apply_patch.ts b/packages/opencode/src/tool/apply_patch.ts index 06293b6eba6..7b28ba94bb2 100644 --- a/packages/opencode/src/tool/apply_patch.ts +++ b/packages/opencode/src/tool/apply_patch.ts @@ -10,6 +10,7 @@ import { createTwoFilesPatch, diffLines } from "diff" import { assertExternalDirectory } from "./external-directory" import { trimDiff } from "./edit" import { LSP } from "../lsp" +import { Format } from "../format" import { Filesystem } from "../util/filesystem" import DESCRIPTION from "./apply_patch.txt" import { File } from "../file" @@ -220,6 +221,7 @@ export const ApplyPatchTool = Tool.define("apply_patch", { } if (edited) { + await Format.run(edited) await Bus.publish(File.Event.Edited, { file: edited, }) diff --git a/packages/opencode/src/tool/edit.ts b/packages/opencode/src/tool/edit.ts index 1a7614fc17f..95226212df9 100644 --- a/packages/opencode/src/tool/edit.ts +++ b/packages/opencode/src/tool/edit.ts @@ -13,6 +13,7 @@ import { File } from "../file" import { FileWatcher } from "../file/watcher" import { Bus } from "../bus" import { FileTime } from "../file/time" +import { Format } from "../format" import { Filesystem } from "../util/filesystem" import { Instance } from "../project/instance" import { Snapshot } from "@/snapshot" @@ -71,6 +72,7 @@ export const EditTool = Tool.define("edit", { }, }) await Filesystem.write(filePath, params.newString) + await Format.run(filePath) await Bus.publish(File.Event.Edited, { file: filePath, }) @@ -108,6 +110,7 @@ export const EditTool = Tool.define("edit", { }) await Filesystem.write(filePath, contentNew) + await Format.run(filePath) await Bus.publish(File.Event.Edited, { file: filePath, }) diff --git a/packages/opencode/src/tool/write.ts b/packages/opencode/src/tool/write.ts index 83474a543ca..78d64fc4fc1 100644 --- a/packages/opencode/src/tool/write.ts +++ b/packages/opencode/src/tool/write.ts @@ -8,6 +8,7 @@ import { Bus } from "../bus" import { File } from "../file" import { FileWatcher } from "../file/watcher" import { FileTime } from "../file/time" +import { Format } from "../format" import { Filesystem } from "../util/filesystem" import { Instance } from "../project/instance" import { trimDiff } from "./edit" @@ -42,6 +43,7 @@ export const WriteTool = Tool.define("write", { }) await Filesystem.write(filepath, params.content) + await Format.run(filepath) await Bus.publish(File.Event.Edited, { file: filepath, }) From 0c2b5b2c397b6bc8cb06fe719af5674b2373560b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 08:49:06 -0400 Subject: [PATCH 5/9] fix(bus): use Fiber.interrupt for clean disposal of subscribeAll Use forkInstance + Fiber.interrupt (which awaits) instead of runCallbackInstance + interruptUnsafe (fire-and-forget) for subscribeAll. This ensures the fiber completes before layer invalidation, allowing the RcMap refCount to drop to 0. subscribeAll now delivers InstanceDisposed as the last callback message via Effect.ensuring when the fiber is interrupted during disposal, but not on manual unsubscribe. Add priority support to registerDisposer so Bus can interrupt subscription fibers (priority -1) before layer invalidation (priority 0). Add forkInstance helper to effect/runtime that returns a Fiber instead of an interrupt function. --- packages/opencode/src/bus/index.ts | 49 +++++++++++++------ .../opencode/src/effect/instance-registry.ts | 23 +++++++-- packages/opencode/src/effect/runtime.ts | 8 ++- 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 5a1c106f4c5..b10c2996650 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,10 +1,11 @@ import z from "zod" -import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" +import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" -import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" +import { registerDisposer } from "../effect/instance-registry" +import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) @@ -51,10 +52,7 @@ export namespace Bus { return ps }) - function publish( - def: D, - properties: z.output, - ) { + function publish(def: D, properties: z.output) { return Effect.gen(function* () { const payload: Payload = { type: def.type, properties } log.info("publishing", { type: def.type }) @@ -97,9 +95,7 @@ export namespace Bus { function runStream(stream: (svc: Interface) => Stream.Stream, callback: (event: any) => void) { return runCallbackInstance( - Service.use((svc) => - stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), - ), + Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))), ) } @@ -107,14 +103,39 @@ export namespace Bus { return runPromiseInstance(Service.use((svc) => svc.publish(def, properties))) } - export function subscribe( - def: D, - callback: (event: Payload) => void, - ) { + export function subscribe(def: D, callback: (event: Payload) => void) { return runStream((svc) => svc.subscribe(def), callback) } export function subscribeAll(callback: (event: any) => void) { - return runStream((svc) => svc.subscribeAll(), callback) + const directory = Instance.directory + let manualUnsub = false + + const fiber = forkInstance( + Service.use((svc) => + svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), + ).pipe( + Effect.ensuring( + Effect.sync(() => { + if (!manualUnsub) { + callback({ type: InstanceDisposed.type, properties: { directory } }) + } + }), + ), + ), + ) + + // Interrupt the fiber before the layer is invalidated, awaiting + // completion so the refCount drops and the scope can close. + const unregister = registerDisposer( + (dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))), + -1, + ) + + return () => { + manualUnsub = true + unregister() + fiber.interruptUnsafe() + } } } diff --git a/packages/opencode/src/effect/instance-registry.ts b/packages/opencode/src/effect/instance-registry.ts index 59c556e0447..49b4abe137c 100644 --- a/packages/opencode/src/effect/instance-registry.ts +++ b/packages/opencode/src/effect/instance-registry.ts @@ -1,12 +1,25 @@ -const disposers = new Set<(directory: string) => Promise>() +const disposers = new Set<{ + fn: (directory: string) => Promise + priority: number +}>() -export function registerDisposer(disposer: (directory: string) => Promise) { - disposers.add(disposer) +export function registerDisposer(disposer: (directory: string) => Promise, priority = 0) { + const item = { + fn: disposer, + priority, + } + disposers.add(item) return () => { - disposers.delete(disposer) + disposers.delete(item) } } export async function disposeInstance(directory: string) { - await Promise.allSettled([...disposers].map((disposer) => disposer(directory))) + const list = [...disposers].sort((a, b) => a.priority - b.priority) + const seen = new Set() + for (const item of list) { + if (seen.has(item.priority)) continue + seen.add(item.priority) + await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory))) + } } diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index 4465b106f47..794e5978e9c 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -1,4 +1,4 @@ -import { Effect, Layer, ManagedRuntime } from "effect" +import { Effect, Fiber, Layer, ManagedRuntime } from "effect" import { AccountEffect } from "@/account/effect" import { AuthEffect } from "@/auth/effect" import { Instances } from "@/effect/instances" @@ -18,6 +18,12 @@ export function runPromiseInstance(effect: Effect.Effect( + effect: Effect.Effect, +): Fiber.Fiber { + return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function runCallbackInstance( effect: Effect.Effect, ): (interruptor?: number) => void { From 992f4f794ad20f8ce828499caf3e170881797474 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 09:40:39 -0400 Subject: [PATCH 6/9] fix(bus): use GlobalBus for InstanceDisposed in legacy subscribeAll MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The sync callback API can't wait for async layer acquisition, so delivering InstanceDisposed through the PubSub stream is a race condition. Instead, the legacy subscribeAll adapter listens on GlobalBus for InstanceDisposed matching the current directory. The Effect service's stream ending IS the disposal signal for Effect consumers — this is only needed for the legacy callback API. Also reverts forceInvalidate, fiber tracking, priority-based disposal, and other workaround attempts. Clean simple solution. --- packages/opencode/src/bus/index.ts | 42 +++++++------------ .../opencode/src/effect/instance-registry.ts | 23 +++------- packages/opencode/src/effect/runtime.ts | 8 +--- packages/opencode/test/bus/bus.test.ts | 33 +++++++++++++++ 4 files changed, 54 insertions(+), 52 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index b10c2996650..49b2a2335dd 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,11 +1,10 @@ import z from "zod" -import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect" +import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" -import { registerDisposer } from "../effect/instance-registry" -import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime" +import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) @@ -109,33 +108,22 @@ export namespace Bus { export function subscribeAll(callback: (event: any) => void) { const directory = Instance.directory - let manualUnsub = false - - const fiber = forkInstance( - Service.use((svc) => - svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), - ).pipe( - Effect.ensuring( - Effect.sync(() => { - if (!manualUnsub) { - callback({ type: InstanceDisposed.type, properties: { directory } }) - } - }), - ), - ), - ) - // Interrupt the fiber before the layer is invalidated, awaiting - // completion so the refCount drops and the scope can close. - const unregister = registerDisposer( - (dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))), - -1, - ) + // InstanceDisposed is delivered via GlobalBus because the sync + // callback API can't wait for async layer acquisition. The Effect + // service's stream ending IS the disposal signal for Effect consumers. + const onDispose = (evt: { directory?: string; payload: any }) => { + if (evt.payload.type !== InstanceDisposed.type) return + if (evt.directory !== directory) return + callback(evt.payload) + GlobalBus.off("event", onDispose) + } + GlobalBus.on("event", onDispose) + const interrupt = runStream((svc) => svc.subscribeAll(), callback) return () => { - manualUnsub = true - unregister() - fiber.interruptUnsafe() + GlobalBus.off("event", onDispose) + interrupt() } } } diff --git a/packages/opencode/src/effect/instance-registry.ts b/packages/opencode/src/effect/instance-registry.ts index 49b4abe137c..59c556e0447 100644 --- a/packages/opencode/src/effect/instance-registry.ts +++ b/packages/opencode/src/effect/instance-registry.ts @@ -1,25 +1,12 @@ -const disposers = new Set<{ - fn: (directory: string) => Promise - priority: number -}>() +const disposers = new Set<(directory: string) => Promise>() -export function registerDisposer(disposer: (directory: string) => Promise, priority = 0) { - const item = { - fn: disposer, - priority, - } - disposers.add(item) +export function registerDisposer(disposer: (directory: string) => Promise) { + disposers.add(disposer) return () => { - disposers.delete(item) + disposers.delete(disposer) } } export async function disposeInstance(directory: string) { - const list = [...disposers].sort((a, b) => a.priority - b.priority) - const seen = new Set() - for (const item of list) { - if (seen.has(item.priority)) continue - seen.add(item.priority) - await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory))) - } + await Promise.allSettled([...disposers].map((disposer) => disposer(directory))) } diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index 794e5978e9c..4465b106f47 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -1,4 +1,4 @@ -import { Effect, Fiber, Layer, ManagedRuntime } from "effect" +import { Effect, Layer, ManagedRuntime } from "effect" import { AccountEffect } from "@/account/effect" import { AuthEffect } from "@/auth/effect" import { Instances } from "@/effect/instances" @@ -18,12 +18,6 @@ export function runPromiseInstance(effect: Effect.Effect( - effect: Effect.Effect, -): Fiber.Fiber { - return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) -} - export function runCallbackInstance( effect: Effect.Effect, ): (interruptor?: number) => void { diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index b3da0fc9eaa..a8929aa1c93 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -4,6 +4,7 @@ import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" import { GlobalBus } from "../../src/bus/global" import { Instance } from "../../src/project/instance" +import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" // --------------------------------------------------------------------------- @@ -190,6 +191,38 @@ describe("Bus", () => { expect(all).toEqual(["test.ping"]) }) + + test("subscribeAll delivers InstanceDisposed via GlobalBus on disposal", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + }) + + await Instance.disposeAll() + + expect(all).toContain(Bus.InstanceDisposed.type) + }) + + test("manual unsubscribe suppresses InstanceDisposed", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + let unsub = () => {} + + await withInstance(tmp.path, async () => { + unsub = Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + }) + + unsub() + await Instance.disposeAll() + + expect(all).not.toContain(Bus.InstanceDisposed.type) + }) }) describe("GlobalBus forwarding", () => { From 81f71c9b30cd2814096b174d2b702245637bd488 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 12:17:39 -0400 Subject: [PATCH 7/9] fix(bus): GlobalBus bridge for InstanceDisposed + forceInvalidate + Effect tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Legacy subscribeAll delivers InstanceDisposed via GlobalBus because the fiber starts asynchronously and may not be running when disposal happens. This bridge can be removed once upstream PubSub.shutdown properly wakes suspended subscribers. Add forceInvalidate in Instances that closes the RcMap entry scope regardless of refCount. Standard RcMap.invalidate bails when refCount > 0 — an upstream issue (Effect-TS/effect-smol#1799). Add PubSub shutdown finalizer to Bus layer so layer teardown properly cleans up PubSubs. Add Effect-native tests proving forkScoped + scope closure works correctly: ensuring fires when the scope closes, streams receive published events. Remove stale GlobalBus disposal test (instance.ts responsibility). --- packages/opencode/src/bus/index.ts | 22 ++++- packages/opencode/src/effect/instances.ts | 20 ++++- packages/opencode/test/bus/bus.test.ts | 98 +++++++++++++++++------ 3 files changed, 109 insertions(+), 31 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 49b2a2335dd..86bdd3d878d 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -84,6 +84,19 @@ export namespace Bus { ) } + // Shut down all PubSubs when the layer is torn down. + // This causes Stream.fromPubSub consumers to end, triggering + // their ensuring/finalizers. + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + log.info("shutting down PubSubs") + yield* PubSub.shutdown(wildcardPubSub) + for (const ps of pubsubs.values()) { + yield* PubSub.shutdown(ps) + } + }), + ) + return Service.of({ publish, subscribe, subscribeAll }) }), ) @@ -109,9 +122,12 @@ export namespace Bus { export function subscribeAll(callback: (event: any) => void) { const directory = Instance.directory - // InstanceDisposed is delivered via GlobalBus because the sync - // callback API can't wait for async layer acquisition. The Effect - // service's stream ending IS the disposal signal for Effect consumers. + // InstanceDisposed is delivered via GlobalBus because the legacy + // adapter's fiber starts asynchronously and may not be running when + // disposal happens. In the Effect-native path, forkScoped + scope + // closure handles this correctly. This bridge can be removed once + // upstream PubSub.shutdown properly wakes suspended subscribers: + // https://github.com/Effect-TS/effect-smol/issues/TBD const onDispose = (evt: { directory?: string; payload: any }) => { if (evt.payload.type !== InstanceDisposed.type) return if (evt.directory !== directory) return diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index fa3ec528f4a..457b3518eec 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -1,4 +1,4 @@ -import { Effect, Layer, LayerMap, ServiceMap } from "effect" +import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect" import { Bus } from "@/bus" import { File } from "@/file" import { FileTime } from "@/file/time" @@ -59,7 +59,23 @@ export class Instances extends ServiceMap.Service Effect.runPromise(layerMap.invalidate(directory))) + + // Force-invalidate closes the RcMap entry scope even when refCount > 0. + // Standard RcMap.invalidate bails in that case, leaving long-running + // consumer fibers orphaned. This is an upstream issue: + // https://github.com/Effect-TS/effect-smol/pull/1799 + const forceInvalidate = (directory: string) => + Effect.gen(function* () { + const rcMap = layerMap.rcMap + if (rcMap.state._tag === "Closed") return + const entry = MutableHashMap.get(rcMap.state.map, directory) + if (entry._tag === "None") return + MutableHashMap.remove(rcMap.state.map, directory) + if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber) + yield* Scope.close(entry.value.scope, Exit.void) + }).pipe(Effect.uninterruptible) + + const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory))) yield* Effect.addFinalizer(() => Effect.sync(unregister)) return Instances.of(layerMap) }), diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index a8929aa1c93..2c154049a64 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -1,10 +1,10 @@ import { afterEach, describe, expect, test } from "bun:test" +import { Deferred, Effect, Stream } from "effect" import z from "zod" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" import { GlobalBus } from "../../src/bus/global" import { Instance } from "../../src/project/instance" -import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" // --------------------------------------------------------------------------- @@ -192,7 +192,7 @@ describe("Bus", () => { expect(all).toEqual(["test.ping"]) }) - test("subscribeAll delivers InstanceDisposed via GlobalBus on disposal", async () => { + test("subscribeAll delivers InstanceDisposed on disposal", async () => { await using tmp = await tmpdir() const all: string[] = [] @@ -200,10 +200,12 @@ describe("Bus", () => { Bus.subscribeAll((evt) => { all.push(evt.type) }) + await Bus.publish(TestEvent.Ping, { value: 1 }) }) await Instance.disposeAll() + expect(all).toContain("test.ping") expect(all).toContain(Bus.InstanceDisposed.type) }) @@ -279,30 +281,6 @@ describe("Bus", () => { }) }) - describe("instance disposal", () => { - test("InstanceDisposed is emitted to GlobalBus on disposal", async () => { - await using tmp = await tmpdir() - const globalEvents: Array<{ directory?: string; payload: any }> = [] - - const handler = (evt: any) => globalEvents.push(evt) - GlobalBus.on("event", handler) - - try { - await withInstance(tmp.path, async () => { - // Instance is active — subscribe so the layer gets created - Bus.subscribe(TestEvent.Ping, () => {}) - }) - - await Instance.disposeAll() - - const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed") - expect(disposed).toBeDefined() - expect(disposed!.payload.properties.directory).toBe(tmp.path) - } finally { - GlobalBus.off("event", handler) - } - }) - }) describe("async subscribers", () => { test("publish is fire-and-forget (does not await subscriber callbacks)", async () => { @@ -323,4 +301,72 @@ describe("Bus", () => { expect(received).toEqual([1]) }) }) + + describe("Effect service", () => { + test("subscribeAll stream receives published events", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + const done = yield* Deferred.make() + let count = 0 + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach((msg) => + Effect.gen(function* () { + received.push(msg.type) + if (++count >= 2) yield* Deferred.succeed(done, undefined) + }), + ), + ), + ) + + // Let the forked fiber start and subscribe to the PubSub + yield* Effect.yieldNow + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* svc.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done) + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(received).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll stream ends with ensuring when scope closes", async () => { + await using tmp = await tmpdir() + let ensuringFired = false + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach(() => Effect.void), + Effect.ensuring(Effect.sync(() => { + ensuringFired = true + })), + ), + ) + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* Effect.yieldNow + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(ensuringFired).toBe(true) + }) + }) }) From 9be68a9fa426fb4b6f699d6905805cea7625ad76 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 12:40:07 -0400 Subject: [PATCH 8/9] fix: link to upstream PubSub shutdown PR --- packages/opencode/src/bus/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 86bdd3d878d..e8915205a8f 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -127,7 +127,7 @@ export namespace Bus { // disposal happens. In the Effect-native path, forkScoped + scope // closure handles this correctly. This bridge can be removed once // upstream PubSub.shutdown properly wakes suspended subscribers: - // https://github.com/Effect-TS/effect-smol/issues/TBD + // https://github.com/Effect-TS/effect-smol/pull/1800 const onDispose = (evt: { directory?: string; payload: any }) => { if (evt.payload.type !== InstanceDisposed.type) return if (evt.directory !== directory) return From 8a3aa943ddc9ba03accc3704257c49c86ba5e48d Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 12:55:59 -0400 Subject: [PATCH 9/9] fix: suppress unhandled interrupt error from forceInvalidate --- packages/opencode/src/effect/instances.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index 457b3518eec..c1acef45124 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -73,7 +73,7 @@ export class Instances extends ServiceMap.Service Effect.runPromise(forceInvalidate(directory))) yield* Effect.addFinalizer(() => Effect.sync(unregister))