diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index df44a3a229c..425dc6611b1 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -1,3 +1,4 @@ +import { Effect } from "effect" import { Log } from "@/util/log" import { Context } from "../util/context" import { Project } from "./project" @@ -5,6 +6,7 @@ import { State } from "./state" import { iife } from "@/util/iife" import { GlobalBus } from "@/bus/global" import { Filesystem } from "@/util/filesystem" +import * as InstanceState from "@/util/instance-state" interface Context { directory: string @@ -106,7 +108,7 @@ export const Instance = { async reload(input: { directory: string; init?: () => Promise; project?: Project.Info; worktree?: string }) { const directory = Filesystem.resolve(input.directory) Log.Default.info("reloading instance", { directory }) - await State.dispose(directory) + await Promise.all([State.dispose(directory), Effect.runPromise(InstanceState.dispose(directory))]) cache.delete(directory) const next = track(directory, boot({ ...input, directory })) emit(directory) @@ -114,7 +116,7 @@ export const Instance = { }, async dispose() { Log.Default.info("disposing instance", { directory: Instance.directory }) - await State.dispose(Instance.directory) + await Promise.all([State.dispose(Instance.directory), Effect.runPromise(InstanceState.dispose(Instance.directory))]) cache.delete(Instance.directory) emit(Instance.directory) }, diff --git a/packages/opencode/src/provider/auth-service.ts b/packages/opencode/src/provider/auth-service.ts index a97ce1840c6..951a1dba483 100644 --- a/packages/opencode/src/provider/auth-service.ts +++ b/packages/opencode/src/provider/auth-service.ts @@ -1,23 +1,14 @@ -import { Effect, Layer, ServiceMap } from "effect" +import { Effect, Layer, Record, ServiceMap, Struct } from "effect" import { Instance } from "@/project/instance" import { Plugin } from "../plugin" -import { filter, fromEntries, map, mapValues, pipe } from "remeda" +import { filter, fromEntries, map, pipe } from "remeda" import type { AuthOuathResult } from "@opencode-ai/plugin" import { NamedError } from "@opencode-ai/util/error" import * as Auth from "@/auth/service" +import * as InstanceState from "@/util/instance-state" import { ProviderID } from "./schema" import z from "zod" -const state = Instance.state(async () => { - const methods = pipe( - await Plugin.list(), - filter((x) => x.auth?.provider !== undefined), - map((x) => [x.auth!.provider, x.auth!] as const), - fromEntries(), - ) - return { methods, pending: {} as Record } -}) - export type Method = { type: "oauth" | "api" label: string @@ -53,13 +44,20 @@ export type ProviderAuthError = export namespace ProviderAuthService { export interface Service { + /** Get available auth methods for each provider (e.g. OAuth, API key). */ readonly methods: () => Effect.Effect> + + /** Start an OAuth authorization flow for a provider. Returns the URL to redirect to. */ readonly authorize: (input: { providerID: ProviderID; method: number }) => Effect.Effect + + /** Complete an OAuth flow after the user has authorized. Exchanges the code/callback for credentials. */ readonly callback: (input: { providerID: ProviderID method: number code?: string }) => Effect.Effect + + /** Set an API key directly for a provider (no OAuth flow). */ readonly api: (input: { providerID: ProviderID; key: string }) => Effect.Effect } } @@ -71,35 +69,35 @@ export class ProviderAuthService extends ServiceMap.Service + Effect.promise(async () => { + const methods = pipe( + await Plugin.list(), + filter((x) => x.auth?.provider !== undefined), + map((x) => [x.auth!.provider, x.auth!] as const), + fromEntries(), + ) + return { methods, pending: {} as Record } + }), + }) - const methods = Effect.fn("ProviderAuthService.methods")(() => - Effect.promise(() => - state().then((x) => - mapValues(x.methods, (y) => - y.methods.map( - (z): Method => ({ - type: z.type, - label: z.label, - }), - ), - ), - ), - ), - ) + const methods = Effect.fn("ProviderAuthService.methods")(function* () { + const x = yield* InstanceState.get(state) + return Record.map(x.methods, (y) => y.methods.map((z): Method => Struct.pick(z, ["type", "label"]))) + }) const authorize = Effect.fn("ProviderAuthService.authorize")(function* (input: { providerID: ProviderID method: number }) { - const item = yield* Effect.promise(() => state().then((x) => x.methods[input.providerID])) - const method = item.methods[input.method] + const authHook = (yield* InstanceState.get(state)).methods[input.providerID] + const method = authHook.methods[input.method] if (method.type !== "oauth") return const result = yield* Effect.promise(() => method.authorize()) - yield* Effect.promise(() => - state().then((x) => { - x.pending[input.providerID] = result - }), - ) + + const s = yield* InstanceState.get(state) + s.pending[input.providerID] = result return { url: result.url, method: result.method, @@ -112,17 +110,15 @@ export class ProviderAuthService extends ServiceMap.Service state().then((x) => x.pending[input.providerID])) + const match = (yield* InstanceState.get(state)).pending[input.providerID] if (!match) return yield* Effect.fail(new OauthMissing({ providerID: input.providerID })) - const result = - match.method === "code" - ? yield* Effect.gen(function* () { - const code = input.code - if (!code) return yield* Effect.fail(new OauthCodeMissing({ providerID: input.providerID })) - return yield* Effect.promise(() => match.callback(code)) - }) - : yield* Effect.promise(() => match.callback()) + if (match.method === "code" && !input.code) + return yield* Effect.fail(new OauthCodeMissing({ providerID: input.providerID })) + + const result = yield* Effect.promise(() => + match.method === "code" ? match.callback(input.code!) : match.callback(), + ) if (!result || result.type !== "success") return yield* Effect.fail(new OauthCallbackFailed({})) diff --git a/packages/opencode/src/provider/auth.ts b/packages/opencode/src/provider/auth.ts index bc53d874c4c..d513085392e 100644 --- a/packages/opencode/src/provider/auth.ts +++ b/packages/opencode/src/provider/auth.ts @@ -5,6 +5,9 @@ import { fn } from "@/util/fn" import * as S from "./auth-service" import { ProviderID } from "./schema" +// Separate runtime: ProviderAuthService can't join the shared runtime because +// runtime.ts → auth-service.ts → provider/auth.ts creates a circular import. +// AuthService is stateless file I/O so the duplicate instance is harmless. const rt = ManagedRuntime.make(S.ProviderAuthService.defaultLayer) function runPromise(f: (service: S.ProviderAuthService.Service) => Effect.Effect) { diff --git a/packages/opencode/src/util/instance-state.ts b/packages/opencode/src/util/instance-state.ts new file mode 100644 index 00000000000..7d6a6d9768d --- /dev/null +++ b/packages/opencode/src/util/instance-state.ts @@ -0,0 +1,48 @@ +import { Effect, ScopedCache, Scope } from "effect" + +import { Instance } from "@/project/instance" + +const TypeId = Symbol.for("@opencode/InstanceState") + +type Task = (key: string) => Effect.Effect + +const tasks = new Set() + +export interface InstanceState { + readonly [TypeId]: typeof TypeId + readonly cache: ScopedCache.ScopedCache +} + +export const make = (input: { + lookup: (key: string) => Effect.Effect + release?: (value: A, key: string) => Effect.Effect +}): Effect.Effect, never, R | Scope.Scope> => + Effect.gen(function* () { + const cache = yield* ScopedCache.make({ + capacity: Number.POSITIVE_INFINITY, + lookup: (key) => + Effect.acquireRelease(input.lookup(key), (value) => (input.release ? input.release(value, key) : Effect.void)), + }) + + const task: Task = (key) => ScopedCache.invalidate(cache, key) + tasks.add(task) + yield* Effect.addFinalizer(() => Effect.sync(() => void tasks.delete(task))) + + return { + [TypeId]: TypeId, + cache, + } + }) + +export const get = (self: InstanceState) => ScopedCache.get(self.cache, Instance.directory) + +export const has = (self: InstanceState) => ScopedCache.has(self.cache, Instance.directory) + +export const invalidate = (self: InstanceState) => + ScopedCache.invalidate(self.cache, Instance.directory) + +export const dispose = (key: string) => + Effect.all( + [...tasks].map((task) => task(key)), + { concurrency: "unbounded" }, + ) diff --git a/packages/opencode/test/project/state.test.ts b/packages/opencode/test/project/state.test.ts new file mode 100644 index 00000000000..314343d671c --- /dev/null +++ b/packages/opencode/test/project/state.test.ts @@ -0,0 +1,87 @@ +import { expect, test } from "bun:test" + +import { State } from "../../src/project/state" + +test("State.create caches values for the same key", async () => { + let key = "a" + let n = 0 + const state = State.create( + () => key, + () => ({ n: ++n }), + ) + + const a = state() + const b = state() + + expect(a).toBe(b) + expect(n).toBe(1) + + await State.dispose("a") +}) + +test("State.create isolates values by key", async () => { + let key = "a" + let n = 0 + const state = State.create( + () => key, + () => ({ n: ++n }), + ) + + const a = state() + key = "b" + const b = state() + key = "a" + const c = state() + + expect(a).toBe(c) + expect(a).not.toBe(b) + expect(n).toBe(2) + + await State.dispose("a") + await State.dispose("b") +}) + +test("State.dispose clears a key and runs cleanup", async () => { + const seen: string[] = [] + let key = "a" + let n = 0 + const state = State.create( + () => key, + () => ({ n: ++n }), + async (value) => { + seen.push(String(value.n)) + }, + ) + + const a = state() + await State.dispose("a") + const b = state() + + expect(a).not.toBe(b) + expect(seen).toEqual(["1"]) + + await State.dispose("a") +}) + +test("State.create dedupes concurrent promise initialization", async () => { + const gate = Promise.withResolvers() + let n = 0 + const state = State.create( + () => "a", + async () => { + n += 1 + await gate.promise + return { n } + }, + ) + + const task = Promise.all([state(), state()]) + await Promise.resolve() + expect(n).toBe(1) + + gate.resolve() + const [a, b] = await task + expect(a).toBe(b) + + await State.dispose("a") +}) diff --git a/packages/opencode/test/util/instance-state.test.ts b/packages/opencode/test/util/instance-state.test.ts new file mode 100644 index 00000000000..bd2df4bbf64 --- /dev/null +++ b/packages/opencode/test/util/instance-state.test.ts @@ -0,0 +1,139 @@ +import { afterEach, expect, test } from "bun:test" +import { Effect } from "effect" + +import { Instance } from "../../src/project/instance" +import * as InstanceState from "../../src/util/instance-state" +import { tmpdir } from "../fixture/fixture" + +async function access(state: InstanceState.InstanceState, dir: string) { + return Instance.provide({ + directory: dir, + fn: () => Effect.runPromise(InstanceState.get(state)), + }) +} + +afterEach(async () => { + await Instance.disposeAll() +}) + +test("InstanceState caches values for the same instance", async () => { + await using tmp = await tmpdir() + let n = 0 + + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const state = yield* InstanceState.make({ + lookup: () => Effect.sync(() => ({ n: ++n })), + }) + + const a = yield* Effect.promise(() => access(state, tmp.path)) + const b = yield* Effect.promise(() => access(state, tmp.path)) + + expect(a).toBe(b) + expect(n).toBe(1) + }), + ), + ) +}) + +test("InstanceState isolates values by directory", async () => { + await using a = await tmpdir() + await using b = await tmpdir() + let n = 0 + + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const state = yield* InstanceState.make({ + lookup: (dir) => Effect.sync(() => ({ dir, n: ++n })), + }) + + const x = yield* Effect.promise(() => access(state, a.path)) + const y = yield* Effect.promise(() => access(state, b.path)) + const z = yield* Effect.promise(() => access(state, a.path)) + + expect(x).toBe(z) + expect(x).not.toBe(y) + expect(n).toBe(2) + }), + ), + ) +}) + +test("InstanceState is disposed on instance reload", async () => { + await using tmp = await tmpdir() + const seen: string[] = [] + let n = 0 + + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const state = yield* InstanceState.make({ + lookup: () => Effect.sync(() => ({ n: ++n })), + release: (value) => + Effect.sync(() => { + seen.push(String(value.n)) + }), + }) + + const a = yield* Effect.promise(() => access(state, tmp.path)) + yield* Effect.promise(() => Instance.reload({ directory: tmp.path })) + const b = yield* Effect.promise(() => access(state, tmp.path)) + + expect(a).not.toBe(b) + expect(seen).toEqual(["1"]) + }), + ), + ) +}) + +test("InstanceState is disposed on disposeAll", async () => { + await using a = await tmpdir() + await using b = await tmpdir() + const seen: string[] = [] + + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const state = yield* InstanceState.make({ + lookup: (dir) => Effect.sync(() => ({ dir })), + release: (value) => + Effect.sync(() => { + seen.push(value.dir) + }), + }) + + yield* Effect.promise(() => access(state, a.path)) + yield* Effect.promise(() => access(state, b.path)) + yield* Effect.promise(() => Instance.disposeAll()) + + expect(seen.sort()).toEqual([a.path, b.path].sort()) + }), + ), + ) +}) + +test("InstanceState dedupes concurrent lookups for the same directory", async () => { + await using tmp = await tmpdir() + let n = 0 + + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const state = yield* InstanceState.make({ + lookup: () => + Effect.promise(async () => { + n += 1 + await Bun.sleep(10) + return { n } + }), + }) + + const [a, b] = yield* Effect.promise(() => Promise.all([access(state, tmp.path), access(state, tmp.path)])) + expect(a).toBe(b) + expect(n).toBe(1) + }), + ), + ) +})