diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index 7fe13833c86..1d9a31d4a27 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -1,5 +1,5 @@ import z from "zod" -import type { ZodType } from "zod" +import type { ZodObject, ZodRawShape } from "zod" import { Log } from "../util/log" export namespace BusEvent { @@ -9,7 +9,7 @@ export namespace BusEvent { const registry = new Map() - export function define(type: Type, properties: Properties) { + export function define>(type: Type, properties: Properties) { const result = { type, properties, diff --git a/packages/opencode/src/bus/global.ts b/packages/opencode/src/bus/global.ts index 43386dd6b20..dcc7664007e 100644 --- a/packages/opencode/src/bus/global.ts +++ b/packages/opencode/src/bus/global.ts @@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{ event: [ { directory?: string - payload: any + payload: { type: string; properties: Record } }, ] }>() diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..e8915205a8f 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,12 +1,13 @@ import z from "zod" +import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" +import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) - type Subscription = (event: any) => void export const InstanceDisposed = BusEvent.define( "server.instance.disposed", @@ -15,91 +16,130 @@ export namespace Bus { }), ) - const state = Instance.state( - () => { - const subscriptions = new Map() + // --------------------------------------------------------------------------- + // Service definition + // --------------------------------------------------------------------------- - return { - subscriptions, + type Payload = { + type: D["type"] + properties: z.infer + } + + export interface Interface { + readonly publish: ( + def: D, + properties: z.output, + ) => Effect.Effect + readonly subscribe: (def: D) => Stream.Stream> + readonly subscribeAll: () => Stream.Stream + } + + export class Service extends ServiceMap.Service()("@opencode/Bus") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const pubsubs = new Map>() + const wildcardPubSub = yield* PubSub.unbounded() + + const getOrCreate = Effect.fnUntraced(function* (type: string) { + let ps = pubsubs.get(type) + if (!ps) { + ps = yield* PubSub.unbounded() + pubsubs.set(type, ps) + } + return ps + }) + + function publish(def: D, properties: z.output) { + return Effect.gen(function* () { + const payload: Payload = { type: def.type, properties } + log.info("publishing", { type: def.type }) + + const ps = pubsubs.get(def.type) + if (ps) yield* PubSub.publish(ps, payload) + yield* PubSub.publish(wildcardPubSub, payload) + + GlobalBus.emit("event", { + directory: Instance.directory, + payload, + }) + }) } - }, - async (entry) => { - const wildcard = entry.subscriptions.get("*") - if (!wildcard) return - const event = { - type: InstanceDisposed.type, - properties: { - directory: Instance.directory, - }, + + function subscribe(def: D): Stream.Stream> { + log.info("subscribing", { type: def.type }) + return Stream.unwrap( + Effect.gen(function* () { + const ps = yield* getOrCreate(def.type) + return Stream.fromPubSub(ps) as Stream.Stream> + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) } - for (const sub of [...wildcard]) { - sub(event) + + function subscribeAll(): Stream.Stream { + log.info("subscribing", { type: "*" }) + return Stream.fromPubSub(wildcardPubSub).pipe( + Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))), + ) } - }, + + // Shut down all PubSubs when the layer is torn down. + // This causes Stream.fromPubSub consumers to end, triggering + // their ensuring/finalizers. + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + log.info("shutting down PubSubs") + yield* PubSub.shutdown(wildcardPubSub) + for (const ps of pubsubs.values()) { + yield* PubSub.shutdown(ps) + } + }), + ) + + return Service.of({ publish, subscribe, subscribeAll }) + }), ) - export async function publish( - def: Definition, - properties: z.output, - ) { - const payload = { - type: def.type, - properties, - } - log.info("publishing", { - type: def.type, - }) - const pending = [] - for (const key of [def.type, "*"]) { - const match = state().subscriptions.get(key) - for (const sub of match ?? []) { - pending.push(sub(payload)) - } - } - GlobalBus.emit("event", { - directory: Instance.directory, - payload, - }) - return Promise.all(pending) + // --------------------------------------------------------------------------- + // Legacy adapters — plain function API wrapping the Effect service + // --------------------------------------------------------------------------- + + function runStream(stream: (svc: Interface) => Stream.Stream, callback: (event: any) => void) { + return runCallbackInstance( + Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))), + ) } - export function subscribe( - def: Definition, - callback: (event: { type: Definition["type"]; properties: z.infer }) => void, - ) { - return raw(def.type, callback) + export function publish(def: D, properties: z.output) { + return runPromiseInstance(Service.use((svc) => svc.publish(def, properties))) } - export function once( - def: Definition, - callback: (event: { - type: Definition["type"] - properties: z.infer - }) => "done" | undefined, - ) { - const unsub = subscribe(def, (event) => { - if (callback(event)) unsub() - }) + export function subscribe(def: D, callback: (event: Payload) => void) { + return runStream((svc) => svc.subscribe(def), callback) } export function subscribeAll(callback: (event: any) => void) { - return raw("*", callback) - } + const directory = Instance.directory - function raw(type: string, callback: (event: any) => void) { - log.info("subscribing", { type }) - const subscriptions = state().subscriptions - let match = subscriptions.get(type) ?? [] - match.push(callback) - subscriptions.set(type, match) + // InstanceDisposed is delivered via GlobalBus because the legacy + // adapter's fiber starts asynchronously and may not be running when + // disposal happens. In the Effect-native path, forkScoped + scope + // closure handles this correctly. This bridge can be removed once + // upstream PubSub.shutdown properly wakes suspended subscribers: + // https://github.com/Effect-TS/effect-smol/pull/1800 + const onDispose = (evt: { directory?: string; payload: any }) => { + if (evt.payload.type !== InstanceDisposed.type) return + if (evt.directory !== directory) return + callback(evt.payload) + GlobalBus.off("event", onDispose) + } + GlobalBus.on("event", onDispose) + const interrupt = runStream((svc) => svc.subscribeAll(), callback) return () => { - log.info("unsubscribing", { type }) - const match = subscriptions.get(type) - if (!match) return - const index = match.indexOf(callback) - if (index === -1) return - match.splice(index, 1) + GlobalBus.off("event", onDispose) + interrupt() } } } diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index c3c28ed6057..d1f884f35a5 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -123,7 +123,7 @@ export namespace Workspace { await parseSSE(res.body, stop, (event) => { GlobalBus.emit("event", { directory: space.id, - payload: event, + payload: event as { type: string; properties: Record }, }) }) // Wait 250ms and retry if SSE connection fails diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index c05458d5df9..c1acef45124 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -1,4 +1,5 @@ -import { Effect, Layer, LayerMap, ServiceMap } from "effect" +import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect" +import { Bus } from "@/bus" import { File } from "@/file" import { FileTime } from "@/file/time" import { FileWatcher } from "@/file/watcher" @@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry" export { InstanceContext } from "./instance-context" export type InstanceServices = + | Bus.Service | Question.Service | PermissionNext.Service | ProviderAuth.Service @@ -36,6 +38,7 @@ export type InstanceServices = function lookup(_key: string) { const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current)) return Layer.mergeAll( + Layer.fresh(Bus.layer), Layer.fresh(Question.layer), Layer.fresh(PermissionNext.layer), Layer.fresh(ProviderAuth.defaultLayer), @@ -56,7 +59,23 @@ export class Instances extends ServiceMap.Service Effect.runPromise(layerMap.invalidate(directory))) + + // Force-invalidate closes the RcMap entry scope even when refCount > 0. + // Standard RcMap.invalidate bails in that case, leaving long-running + // consumer fibers orphaned. This is an upstream issue: + // https://github.com/Effect-TS/effect-smol/pull/1799 + const forceInvalidate = (directory: string) => + Effect.gen(function* () { + const rcMap = layerMap.rcMap + if (rcMap.state._tag === "Closed") return + const entry = MutableHashMap.get(rcMap.state.map, directory) + if (entry._tag === "None") return + MutableHashMap.remove(rcMap.state.map, directory) + if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber) + yield* Scope.close(entry.value.scope, Exit.void) + }).pipe(Effect.uninterruptible, Effect.ignore) + + const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory))) yield* Effect.addFinalizer(() => Effect.sync(unregister)) return Instances.of(layerMap) }), diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index f52203b2220..4465b106f47 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -18,6 +18,12 @@ export function runPromiseInstance(effect: Effect.Effect( + effect: Effect.Effect, +): (interruptor?: number) => void { + return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function disposeRuntime() { return runtime.dispose() } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 6da8caa08c8..8be6ceab6d1 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -4,9 +4,7 @@ import { InstanceContext } from "@/effect/instance-context" import path from "path" import { mergeDeep } from "remeda" import z from "zod" -import { Bus } from "../bus" import { Config } from "../config/config" -import { File } from "../file" import { Instance } from "../project/instance" import { Process } from "../util/process" import { Log } from "../util/log" @@ -27,6 +25,7 @@ export namespace Format { export type Status = z.infer export interface Interface { + readonly run: (filepath: string) => Effect.Effect readonly status: () => Effect.Effect } @@ -90,48 +89,44 @@ export namespace Format { return result } - yield* Effect.acquireRelease( - Effect.sync(() => - Bus.subscribe( - File.Event.Edited, - Instance.bind(async (payload) => { - const file = payload.properties.file - log.info("formatting", { file }) - const ext = path.extname(file) - - for (const item of await getFormatter(ext)) { - log.info("running", { command: item.command }) - try { - const proc = Process.spawn( - item.command.map((x) => x.replace("$FILE", file)), - { - cwd: instance.directory, - env: { ...process.env, ...item.environment }, - stdout: "ignore", - stderr: "ignore", - }, - ) - const exit = await proc.exited - if (exit !== 0) { - log.error("failed", { - command: item.command, - ...item.environment, - }) - } - } catch (error) { - log.error("failed to format file", { - error, - command: item.command, - ...item.environment, - file, - }) - } + const run = Effect.fn("Format.run")(function* (filepath: string) { + log.info("formatting", { file: filepath }) + const ext = path.extname(filepath) + + for (const item of yield* Effect.promise(() => getFormatter(ext))) { + log.info("running", { command: item.command }) + yield* Effect.tryPromise({ + try: async () => { + const proc = Process.spawn( + item.command.map((x) => x.replace("$FILE", filepath)), + { + cwd: instance.directory, + env: { ...process.env, ...item.environment }, + stdout: "ignore", + stderr: "ignore", + }, + ) + const exit = await proc.exited + if (exit !== 0) { + log.error("failed", { + command: item.command, + ...item.environment, + }) } - }), - ), - ), - (unsubscribe) => Effect.sync(unsubscribe), - ) + }, + catch: (error) => { + log.error("failed to format file", { + error, + command: item.command, + ...item.environment, + file: filepath, + }) + return error + }, + }).pipe(Effect.ignore) + } + }) + log.info("init") const status = Effect.fn("Format.status")(function* () { @@ -147,10 +142,14 @@ export namespace Format { return result }) - return Service.of({ status }) + return Service.of({ run, status }) }), ) + export async function run(filepath: string) { + return runPromiseInstance(Service.use((s) => s.run(filepath))) + } + export async function status() { return runPromiseInstance(Service.use((s) => s.status())) } diff --git a/packages/opencode/src/tool/apply_patch.ts b/packages/opencode/src/tool/apply_patch.ts index 06293b6eba6..7b28ba94bb2 100644 --- a/packages/opencode/src/tool/apply_patch.ts +++ b/packages/opencode/src/tool/apply_patch.ts @@ -10,6 +10,7 @@ import { createTwoFilesPatch, diffLines } from "diff" import { assertExternalDirectory } from "./external-directory" import { trimDiff } from "./edit" import { LSP } from "../lsp" +import { Format } from "../format" import { Filesystem } from "../util/filesystem" import DESCRIPTION from "./apply_patch.txt" import { File } from "../file" @@ -220,6 +221,7 @@ export const ApplyPatchTool = Tool.define("apply_patch", { } if (edited) { + await Format.run(edited) await Bus.publish(File.Event.Edited, { file: edited, }) diff --git a/packages/opencode/src/tool/edit.ts b/packages/opencode/src/tool/edit.ts index 1a7614fc17f..95226212df9 100644 --- a/packages/opencode/src/tool/edit.ts +++ b/packages/opencode/src/tool/edit.ts @@ -13,6 +13,7 @@ import { File } from "../file" import { FileWatcher } from "../file/watcher" import { Bus } from "../bus" import { FileTime } from "../file/time" +import { Format } from "../format" import { Filesystem } from "../util/filesystem" import { Instance } from "../project/instance" import { Snapshot } from "@/snapshot" @@ -71,6 +72,7 @@ export const EditTool = Tool.define("edit", { }, }) await Filesystem.write(filePath, params.newString) + await Format.run(filePath) await Bus.publish(File.Event.Edited, { file: filePath, }) @@ -108,6 +110,7 @@ export const EditTool = Tool.define("edit", { }) await Filesystem.write(filePath, contentNew) + await Format.run(filePath) await Bus.publish(File.Event.Edited, { file: filePath, }) diff --git a/packages/opencode/src/tool/write.ts b/packages/opencode/src/tool/write.ts index 83474a543ca..78d64fc4fc1 100644 --- a/packages/opencode/src/tool/write.ts +++ b/packages/opencode/src/tool/write.ts @@ -8,6 +8,7 @@ import { Bus } from "../bus" import { File } from "../file" import { FileWatcher } from "../file/watcher" import { FileTime } from "../file/time" +import { Format } from "../format" import { Filesystem } from "../util/filesystem" import { Instance } from "../project/instance" import { trimDiff } from "./edit" @@ -42,6 +43,7 @@ export const WriteTool = Tool.define("write", { }) await Filesystem.write(filepath, params.content) + await Format.run(filepath) await Bus.publish(File.Event.Edited, { file: filepath, }) diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts new file mode 100644 index 00000000000..2c154049a64 --- /dev/null +++ b/packages/opencode/test/bus/bus.test.ts @@ -0,0 +1,372 @@ +import { afterEach, describe, expect, test } from "bun:test" +import { Deferred, Effect, Stream } from "effect" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { GlobalBus } from "../../src/bus/global" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +// --------------------------------------------------------------------------- +// Test event definitions +// --------------------------------------------------------------------------- + +const TestEvent = { + Ping: BusEvent.define("test.ping", z.object({ value: z.number() })), + Pong: BusEvent.define("test.pong", z.object({ message: z.string() })), +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function withInstance(directory: string, fn: () => Promise) { + return Instance.provide({ directory, fn }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("Bus", () => { + afterEach(() => Instance.disposeAll()) + + describe("publish + subscribe", () => { + test("subscriber receives matching events", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 42 }) + await Bus.publish(TestEvent.Ping, { value: 99 }) + }) + + expect(received).toEqual([42, 99]) + }) + + test("subscriber does not receive events of other types", async () => { + await using tmp = await tmpdir() + const pings: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + pings.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Pong, { message: "hello" }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + expect(pings).toEqual([1]) + }) + + test("publish with no subscribers does not throw", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + }) + }) + + describe("multiple subscribers", () => { + test("all subscribers for same event type are called", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value)) + Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value)) + await Bus.publish(TestEvent.Ping, { value: 7 }) + }) + + expect(a).toEqual([7]) + expect(b).toEqual([7]) + }) + + test("subscribers are called in registration order", async () => { + await using tmp = await tmpdir() + const order: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, () => order.push("first")) + Bus.subscribe(TestEvent.Ping, () => order.push("second")) + Bus.subscribe(TestEvent.Ping, () => order.push("third")) + await Bus.publish(TestEvent.Ping, { value: 0 }) + }) + + expect(order).toEqual(["first", "second", "third"]) + }) + }) + + describe("unsubscribe", () => { + test("unsubscribe stops delivery", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsub() + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(received).toEqual([1]) + }) + + test("unsubscribe is idempotent", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, () => {}) + unsub() + unsub() // should not throw + }) + }) + + test("unsubscribing one does not affect others", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value)) + Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value)) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsubA() + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(a).toEqual([1]) + expect(b).toEqual([1, 2]) + }) + }) + + describe("subscribeAll", () => { + test("receives events of all types", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Pong, { message: "hi" }) + }) + + expect(all).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll + typed subscribe both fire", async () => { + await using tmp = await tmpdir() + const typed: number[] = [] + const wild: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value)) + Bus.subscribeAll((evt) => wild.push(evt.type)) + await Bus.publish(TestEvent.Ping, { value: 5 }) + }) + + expect(typed).toEqual([5]) + expect(wild).toEqual(["test.ping"]) + }) + + test("unsubscribe from subscribeAll", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribeAll((evt) => all.push(evt.type)) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsub() + await Bus.publish(TestEvent.Pong, { message: "missed" }) + }) + + expect(all).toEqual(["test.ping"]) + }) + + test("subscribeAll delivers InstanceDisposed on disposal", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + await Instance.disposeAll() + + expect(all).toContain("test.ping") + expect(all).toContain(Bus.InstanceDisposed.type) + }) + + test("manual unsubscribe suppresses InstanceDisposed", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + let unsub = () => {} + + await withInstance(tmp.path, async () => { + unsub = Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + }) + + unsub() + await Instance.disposeAll() + + expect(all).not.toContain(Bus.InstanceDisposed.type) + }) + }) + + describe("GlobalBus forwarding", () => { + test("publish emits to GlobalBus with directory", async () => { + await using tmp = await tmpdir() + const globalEvents: Array<{ directory?: string; payload: any }> = [] + + const handler = (evt: any) => globalEvents.push(evt) + GlobalBus.on("event", handler) + + try { + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 42 }) + }) + + const ping = globalEvents.find((e) => e.payload.type === "test.ping") + expect(ping).toBeDefined() + expect(ping!.directory).toBe(tmp.path) + expect(ping!.payload).toEqual({ + type: "test.ping", + properties: { value: 42 }, + }) + } finally { + GlobalBus.off("event", handler) + } + }) + }) + + describe("instance isolation", () => { + test("subscribers in one instance do not receive events from another", async () => { + await using tmpA = await tmpdir() + await using tmpB = await tmpdir() + const eventsA: number[] = [] + const eventsB: number[] = [] + + await withInstance(tmpA.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value)) + }) + + await withInstance(tmpB.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value)) + }) + + await withInstance(tmpA.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + await withInstance(tmpB.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(eventsA).toEqual([1]) + expect(eventsB).toEqual([2]) + }) + }) + + + describe("async subscribers", () => { + test("publish is fire-and-forget (does not await subscriber callbacks)", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, async (evt) => { + await new Promise((r) => setTimeout(r, 10)) + received.push(evt.properties.value) + }) + + await Bus.publish(TestEvent.Ping, { value: 1 }) + // Give the async subscriber time to complete + await new Promise((r) => setTimeout(r, 50)) + }) + + expect(received).toEqual([1]) + }) + }) + + describe("Effect service", () => { + test("subscribeAll stream receives published events", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + const done = yield* Deferred.make() + let count = 0 + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach((msg) => + Effect.gen(function* () { + received.push(msg.type) + if (++count >= 2) yield* Deferred.succeed(done, undefined) + }), + ), + ), + ) + + // Let the forked fiber start and subscribe to the PubSub + yield* Effect.yieldNow + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* svc.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done) + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(received).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll stream ends with ensuring when scope closes", async () => { + await using tmp = await tmpdir() + let ensuringFired = false + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach(() => Effect.void), + Effect.ensuring(Effect.sync(() => { + ensuringFired = true + })), + ), + ) + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* Effect.yieldNow + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(ensuringFired).toBe(true) + }) + }) +}) diff --git a/packages/opencode/test/file/watcher.test.ts b/packages/opencode/test/file/watcher.test.ts index 2cd27643e88..6d4c5f402fd 100644 --- a/packages/opencode/test/file/watcher.test.ts +++ b/packages/opencode/test/file/watcher.test.ts @@ -5,9 +5,9 @@ import path from "path" import { Deferred, Effect, Option } from "effect" import { tmpdir } from "../fixture/fixture" import { watcherConfigLayer, withServices } from "../fixture/instance" +import { Bus } from "../../src/bus" import { FileWatcher } from "../../src/file/watcher" import { Instance } from "../../src/project/instance" -import { GlobalBus } from "../../src/bus/global" // Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows) const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip @@ -16,7 +16,6 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc // Helpers // --------------------------------------------------------------------------- -type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } } type WatcherEvent = { file: string; event: "add" | "change" | "unlink" } /** Run `body` with a live FileWatcher service. */ @@ -36,22 +35,17 @@ function withWatcher(directory: string, body: Effect.Effect) { function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) { let done = false - function on(evt: BusUpdate) { + const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => { if (done) return - if (evt.directory !== directory) return - if (evt.payload.type !== FileWatcher.Event.Updated.type) return - if (!check(evt.payload.properties)) return - hit(evt.payload.properties) - } + if (!check(evt.properties)) return + hit(evt.properties) + }) - function cleanup() { + return () => { if (done) return done = true - GlobalBus.off("event", on) + unsub() } - - GlobalBus.on("event", on) - return cleanup } function wait(directory: string, check: (evt: WatcherEvent) => boolean) {